mineru_service_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. """
  3. MinerU 服务管理模块
  4. 负责:
  5. 1. 自动启动/停止 mineru-api.service 以释放 GPU 显存
  6. 2. 检测 OCR 任务状态
  7. 3. 定时检查空闲并停止服务
  8. """
  9. import asyncio
  10. import os
  11. import socket
  12. import subprocess
  13. import time
  14. import threading
  15. from typing import Optional
  16. from datetime import datetime
  17. from .logging_config import get_logger
  18. logger = get_logger("pdf_converter_v2.mineru_manager")
  19. # 服务名称
  20. MINERU_SERVICE_NAME = "mineru-api.service"
  21. # MinerU API 地址和端口(用于健康检查)
  22. MINERU_API_HOST = os.getenv("MINERU_API_HOST", "127.0.0.1")
  23. MINERU_API_PORT = int(os.getenv("MINERU_API_PORT", "5282"))
  24. # 空闲超时时间(秒),超过此时间无任务则停止服务
  25. IDLE_TIMEOUT_SECONDS = int(os.getenv("MINERU_IDLE_TIMEOUT", "60")) # 默认 1 分钟
  26. # 检查间隔(秒)
  27. CHECK_INTERVAL_SECONDS = int(os.getenv("MINERU_CHECK_INTERVAL", "60")) # 默认 1 分钟
  28. # 服务启动等待超时(秒)
  29. SERVICE_START_TIMEOUT = int(os.getenv("MINERU_START_TIMEOUT", "120")) # 默认 2 分钟
  30. class MinerUServiceManager:
  31. """MinerU 服务管理器(单例模式)"""
  32. _instance: Optional["MinerUServiceManager"] = None
  33. _lock = threading.Lock()
  34. def __new__(cls) -> "MinerUServiceManager":
  35. if cls._instance is None:
  36. with cls._lock:
  37. if cls._instance is None:
  38. cls._instance = super().__new__(cls)
  39. cls._instance._initialized = False
  40. return cls._instance
  41. def __init__(self):
  42. if self._initialized:
  43. return
  44. self._initialized = True
  45. # 当前活跃的 OCR 任务计数
  46. self._active_tasks = 0
  47. self._tasks_lock = threading.Lock()
  48. # 服务正在启动中的标志(防止启动过程中被停止)
  49. self._starting = False
  50. self._starting_lock = threading.Lock()
  51. # 最后一次任务完成时间(或管理器启动时间,用于计算空闲)
  52. self._last_task_end_time: Optional[datetime] = None
  53. # 管理器初始化时间(用于首次空闲检测)
  54. self._init_time: datetime = datetime.now()
  55. # 定时检查线程
  56. self._monitor_thread: Optional[threading.Thread] = None
  57. self._stop_monitor = threading.Event()
  58. # 服务启动锁,避免并发启动
  59. self._start_lock = asyncio.Lock()
  60. logger.info(f"[MinerU管理器] 初始化完成,空闲超时: {IDLE_TIMEOUT_SECONDS}s, 检查间隔: {CHECK_INTERVAL_SECONDS}s")
  61. def _run_systemctl(self, action: str) -> tuple[bool, str]:
  62. """
  63. 执行 systemctl 命令
  64. Args:
  65. action: start, stop, status, is-active
  66. Returns:
  67. (success, output)
  68. """
  69. cmd = ["systemctl", action, MINERU_SERVICE_NAME]
  70. try:
  71. result = subprocess.run(
  72. cmd,
  73. capture_output=True,
  74. text=True,
  75. timeout=30
  76. )
  77. output = result.stdout.strip() or result.stderr.strip()
  78. success = result.returncode == 0
  79. return success, output
  80. except subprocess.TimeoutExpired:
  81. logger.error(f"[MinerU管理器] systemctl {action} 超时")
  82. return False, "timeout"
  83. except Exception as e:
  84. logger.error(f"[MinerU管理器] systemctl {action} 失败: {e}")
  85. return False, str(e)
  86. def is_service_active(self) -> bool:
  87. """检查服务是否正在运行(systemd 状态)"""
  88. success, output = self._run_systemctl("is-active")
  89. is_active = success and output == "active"
  90. logger.debug(f"[MinerU管理器] 服务状态: {output} (active={is_active})")
  91. return is_active
  92. def is_port_available(self, host: str = None, port: int = None, timeout: float = 2.0) -> bool:
  93. """
  94. 检查 MinerU API 端口是否可连接
  95. Args:
  96. host: 主机地址,默认使用 MINERU_API_HOST
  97. port: 端口号,默认使用 MINERU_API_PORT
  98. timeout: 连接超时时间(秒)
  99. Returns:
  100. True 如果端口可连接
  101. """
  102. host = host or MINERU_API_HOST
  103. port = port or MINERU_API_PORT
  104. try:
  105. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  106. sock.settimeout(timeout)
  107. result = sock.connect_ex((host, port))
  108. sock.close()
  109. is_available = result == 0
  110. logger.debug(f"[MinerU管理器] 端口检测 {host}:{port} -> {'可用' if is_available else '不可用'}")
  111. return is_available
  112. except Exception as e:
  113. logger.debug(f"[MinerU管理器] 端口检测失败: {e}")
  114. return False
  115. def is_service_ready(self) -> bool:
  116. """
  117. 检查服务是否完全就绪(systemd active 且端口可连接)
  118. """
  119. return self.is_service_active() and self.is_port_available()
  120. def start_service_sync(self) -> bool:
  121. """同步启动服务,等待端口可用"""
  122. # 如果服务已完全就绪,无需启动
  123. if self.is_service_ready():
  124. logger.debug("[MinerU管理器] 服务已就绪,无需启动")
  125. return True
  126. # 设置启动中标志,防止定时任务在启动过程中停止服务
  127. with self._starting_lock:
  128. self._starting = True
  129. try:
  130. # 如果 systemd 状态是 active 但端口不可用,等待端口
  131. if self.is_service_active():
  132. logger.info("[MinerU管理器] 服务已启动,等待端口就绪...")
  133. else:
  134. logger.info("[MinerU管理器] 正在启动 MinerU 服务...")
  135. success, output = self._run_systemctl("start")
  136. if not success:
  137. logger.error(f"[MinerU管理器] 服务启动失败: {output}")
  138. return False
  139. # 等待服务完全启动(systemd active 且端口可连接)
  140. start_time = time.time()
  141. check_interval = 2 # 每 2 秒检查一次
  142. while time.time() - start_time < SERVICE_START_TIMEOUT:
  143. if self.is_service_ready():
  144. elapsed = time.time() - start_time
  145. logger.info(f"[MinerU管理器] 服务启动成功,端口已就绪(等待 {elapsed:.1f}s)")
  146. return True
  147. # 检查服务是否意外停止
  148. if not self.is_service_active():
  149. logger.error("[MinerU管理器] 服务启动后意外停止")
  150. return False
  151. elapsed = time.time() - start_time
  152. logger.debug(f"[MinerU管理器] 等待端口就绪... ({elapsed:.0f}s/{SERVICE_START_TIMEOUT}s)")
  153. time.sleep(check_interval)
  154. logger.warning(f"[MinerU管理器] 服务启动超时({SERVICE_START_TIMEOUT}s),端口仍不可用")
  155. return False
  156. finally:
  157. # 清除启动中标志
  158. with self._starting_lock:
  159. self._starting = False
  160. async def start_service(self) -> bool:
  161. """异步启动服务"""
  162. async with self._start_lock:
  163. # 在线程池中执行同步操作
  164. loop = asyncio.get_event_loop()
  165. return await loop.run_in_executor(None, self.start_service_sync)
  166. def stop_service_sync(self) -> bool:
  167. """同步停止服务"""
  168. if not self.is_service_active():
  169. logger.debug("[MinerU管理器] 服务未运行,无需停止")
  170. return True
  171. # 检查是否正在启动中
  172. with self._starting_lock:
  173. if self._starting:
  174. logger.warning("[MinerU管理器] 服务正在启动中,不能停止")
  175. return False
  176. # 检查是否有活跃任务
  177. with self._tasks_lock:
  178. if self._active_tasks > 0:
  179. logger.warning(f"[MinerU管理器] 当前有 {self._active_tasks} 个活跃任务,不能停止服务")
  180. return False
  181. logger.info("[MinerU管理器] 正在停止 MinerU 服务以释放 GPU 显存...")
  182. success, output = self._run_systemctl("stop")
  183. if success:
  184. logger.info("[MinerU管理器] 服务已停止,GPU 显存已释放")
  185. return True
  186. else:
  187. logger.error(f"[MinerU管理器] 服务停止失败: {output}")
  188. return False
  189. async def stop_service(self) -> bool:
  190. """异步停止服务"""
  191. loop = asyncio.get_event_loop()
  192. return await loop.run_in_executor(None, self.stop_service_sync)
  193. def task_started(self):
  194. """标记一个 OCR 任务开始"""
  195. with self._tasks_lock:
  196. self._active_tasks += 1
  197. logger.info(f"[MinerU管理器] 任务开始,当前活跃任务数: {self._active_tasks}")
  198. def task_ended(self):
  199. """标记一个 OCR 任务结束"""
  200. with self._tasks_lock:
  201. self._active_tasks = max(0, self._active_tasks - 1)
  202. self._last_task_end_time = datetime.now()
  203. logger.info(f"[MinerU管理器] 任务结束,当前活跃任务数: {self._active_tasks}")
  204. def get_active_task_count(self) -> int:
  205. """获取当前活跃任务数"""
  206. with self._tasks_lock:
  207. return self._active_tasks
  208. def _monitor_loop(self):
  209. """定时监控循环(在单独线程中运行)"""
  210. logger.info("[MinerU管理器] 定时监控线程已启动")
  211. while not self._stop_monitor.is_set():
  212. try:
  213. # 检查是否需要停止服务
  214. self._check_and_stop_if_idle()
  215. except Exception as e:
  216. logger.exception(f"[MinerU管理器] 监控检查异常: {e}")
  217. # 等待下一次检查
  218. self._stop_monitor.wait(CHECK_INTERVAL_SECONDS)
  219. logger.info("[MinerU管理器] 定时监控线程已停止")
  220. def _check_and_stop_if_idle(self):
  221. """检查是否空闲,如果空闲则停止服务"""
  222. with self._tasks_lock:
  223. active_tasks = self._active_tasks
  224. last_end_time = self._last_task_end_time
  225. # 如果有活跃任务,不停止
  226. if active_tasks > 0:
  227. logger.debug(f"[MinerU管理器] 当前有 {active_tasks} 个活跃任务,保持服务运行")
  228. return
  229. # 如果服务未运行,无需处理
  230. if not self.is_service_active():
  231. logger.debug("[MinerU管理器] 服务未运行,跳过检查")
  232. return
  233. # 检查空闲时间
  234. # 如果从未有任务完成,使用管理器初始化时间作为参考
  235. reference_time = last_end_time if last_end_time is not None else self._init_time
  236. idle_seconds = (datetime.now() - reference_time).total_seconds()
  237. if idle_seconds >= IDLE_TIMEOUT_SECONDS:
  238. if last_end_time is None:
  239. logger.info(f"[MinerU管理器] 服务启动后无任务,已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
  240. else:
  241. logger.info(f"[MinerU管理器] 服务已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
  242. self.stop_service_sync()
  243. else:
  244. remaining = IDLE_TIMEOUT_SECONDS - idle_seconds
  245. logger.debug(f"[MinerU管理器] 服务空闲 {idle_seconds:.0f}s,还需 {remaining:.0f}s 达到停止阈值")
  246. def start_monitor(self):
  247. """启动定时监控线程"""
  248. if self._monitor_thread is not None and self._monitor_thread.is_alive():
  249. logger.debug("[MinerU管理器] 监控线程已在运行")
  250. return
  251. self._stop_monitor.clear()
  252. self._monitor_thread = threading.Thread(
  253. target=self._monitor_loop,
  254. name="MinerUMonitor",
  255. daemon=True
  256. )
  257. self._monitor_thread.start()
  258. logger.info("[MinerU管理器] 定时监控已启动")
  259. def stop_monitor(self):
  260. """停止定时监控线程"""
  261. if self._monitor_thread is None:
  262. return
  263. self._stop_monitor.set()
  264. self._monitor_thread.join(timeout=5)
  265. self._monitor_thread = None
  266. logger.info("[MinerU管理器] 定时监控已停止")
  267. def get_status(self) -> dict:
  268. """获取管理器状态"""
  269. with self._tasks_lock:
  270. active_tasks = self._active_tasks
  271. last_end_time = self._last_task_end_time
  272. idle_seconds = None
  273. if last_end_time is not None:
  274. idle_seconds = (datetime.now() - last_end_time).total_seconds()
  275. service_active = self.is_service_active()
  276. port_available = self.is_port_available() if service_active else False
  277. with self._starting_lock:
  278. is_starting = self._starting
  279. return {
  280. "service_name": MINERU_SERVICE_NAME,
  281. "service_active": service_active,
  282. "service_starting": is_starting,
  283. "port_available": port_available,
  284. "service_ready": service_active and port_available,
  285. "api_endpoint": f"http://{MINERU_API_HOST}:{MINERU_API_PORT}",
  286. "active_tasks": active_tasks,
  287. "last_task_end_time": last_end_time.isoformat() if last_end_time else None,
  288. "idle_seconds": idle_seconds,
  289. "idle_timeout_seconds": IDLE_TIMEOUT_SECONDS,
  290. "check_interval_seconds": CHECK_INTERVAL_SECONDS,
  291. "service_start_timeout": SERVICE_START_TIMEOUT,
  292. "monitor_running": self._monitor_thread is not None and self._monitor_thread.is_alive()
  293. }
  294. # 全局单例
  295. _manager: Optional[MinerUServiceManager] = None
  296. def get_mineru_manager() -> MinerUServiceManager:
  297. """获取 MinerU 服务管理器单例"""
  298. global _manager
  299. if _manager is None:
  300. _manager = MinerUServiceManager()
  301. return _manager
  302. async def ensure_mineru_service_running() -> bool:
  303. """
  304. 确保 MinerU 服务正在运行
  305. 在调用 OCR API 前调用此函数
  306. """
  307. manager = get_mineru_manager()
  308. return await manager.start_service()