mineru_service_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. """
  3. MinerU 服务管理模块
  4. 负责:
  5. 1. 自动启动/停止 mineru-api.service 以释放 GPU 显存
  6. 2. 检测 OCR 任务状态
  7. 3. 定时检查空闲并停止服务
  8. """
  9. import asyncio
  10. import os
  11. import socket
  12. import subprocess
  13. import time
  14. import threading
  15. from typing import Optional
  16. from datetime import datetime
  17. from .logging_config import get_logger
  18. from ..config import (
  19. MINERU_API_HOST as _MINERU_API_HOST,
  20. MINERU_API_PORT as _MINERU_API_PORT,
  21. MINERU_IDLE_TIMEOUT as _MINERU_IDLE_TIMEOUT,
  22. MINERU_CHECK_INTERVAL as _MINERU_CHECK_INTERVAL,
  23. MINERU_START_TIMEOUT as _MINERU_START_TIMEOUT,
  24. )
  25. logger = get_logger("pdf_converter_v2.mineru_manager")
  26. # 服务名称
  27. MINERU_SERVICE_NAME = "mineru-api.service"
  28. # MinerU API 地址和端口(用于健康检查)
  29. MINERU_API_HOST = _MINERU_API_HOST
  30. MINERU_API_PORT = _MINERU_API_PORT
  31. # 空闲超时时间(秒),超过此时间无任务则停止服务
  32. IDLE_TIMEOUT_SECONDS = _MINERU_IDLE_TIMEOUT
  33. # 检查间隔(秒)
  34. CHECK_INTERVAL_SECONDS = _MINERU_CHECK_INTERVAL
  35. # 服务启动等待超时(秒)
  36. SERVICE_START_TIMEOUT = _MINERU_START_TIMEOUT
  37. class MinerUServiceManager:
  38. """MinerU 服务管理器(单例模式)"""
  39. _instance: Optional["MinerUServiceManager"] = None
  40. _lock = threading.Lock()
  41. def __new__(cls) -> "MinerUServiceManager":
  42. if cls._instance is None:
  43. with cls._lock:
  44. if cls._instance is None:
  45. cls._instance = super().__new__(cls)
  46. cls._instance._initialized = False
  47. return cls._instance
  48. def __init__(self):
  49. if self._initialized:
  50. return
  51. self._initialized = True
  52. # 当前活跃的 OCR 任务计数
  53. self._active_tasks = 0
  54. self._tasks_lock = threading.Lock()
  55. # 服务正在启动中的标志(防止启动过程中被停止)
  56. self._starting = False
  57. self._starting_lock = threading.Lock()
  58. # 最后一次任务完成时间(或管理器启动时间,用于计算空闲)
  59. self._last_task_end_time: Optional[datetime] = None
  60. # 管理器初始化时间(用于首次空闲检测)
  61. self._init_time: datetime = datetime.now()
  62. # 定时检查线程
  63. self._monitor_thread: Optional[threading.Thread] = None
  64. self._stop_monitor = threading.Event()
  65. # 服务启动锁,避免并发启动
  66. self._start_lock = asyncio.Lock()
  67. logger.info(f"[MinerU管理器] 初始化完成,空闲超时: {IDLE_TIMEOUT_SECONDS}s, 检查间隔: {CHECK_INTERVAL_SECONDS}s")
  68. def _run_systemctl(self, action: str) -> tuple[bool, str]:
  69. """
  70. 执行 systemctl 命令
  71. Args:
  72. action: start, stop, status, is-active
  73. Returns:
  74. (success, output)
  75. """
  76. cmd = ["systemctl", action, MINERU_SERVICE_NAME]
  77. try:
  78. result = subprocess.run(
  79. cmd,
  80. capture_output=True,
  81. text=True,
  82. timeout=30
  83. )
  84. output = result.stdout.strip() or result.stderr.strip()
  85. success = result.returncode == 0
  86. return success, output
  87. except subprocess.TimeoutExpired:
  88. logger.error(f"[MinerU管理器] systemctl {action} 超时")
  89. return False, "timeout"
  90. except Exception as e:
  91. logger.error(f"[MinerU管理器] systemctl {action} 失败: {e}")
  92. return False, str(e)
  93. def is_service_active(self) -> bool:
  94. """检查服务是否正在运行(systemd 状态)"""
  95. success, output = self._run_systemctl("is-active")
  96. is_active = success and output == "active"
  97. logger.debug(f"[MinerU管理器] 服务状态: {output} (active={is_active})")
  98. return is_active
  99. def is_port_available(self, host: str = None, port: int = None, timeout: float = 2.0) -> bool:
  100. """
  101. 检查 MinerU API 端口是否可连接
  102. Args:
  103. host: 主机地址,默认使用 MINERU_API_HOST
  104. port: 端口号,默认使用 MINERU_API_PORT
  105. timeout: 连接超时时间(秒)
  106. Returns:
  107. True 如果端口可连接
  108. """
  109. host = host or MINERU_API_HOST
  110. port = port or MINERU_API_PORT
  111. try:
  112. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  113. sock.settimeout(timeout)
  114. result = sock.connect_ex((host, port))
  115. sock.close()
  116. is_available = result == 0
  117. logger.debug(f"[MinerU管理器] 端口检测 {host}:{port} -> {'可用' if is_available else '不可用'}")
  118. return is_available
  119. except Exception as e:
  120. logger.debug(f"[MinerU管理器] 端口检测失败: {e}")
  121. return False
  122. def is_service_ready(self) -> bool:
  123. """
  124. 检查服务是否完全就绪(systemd active 且端口可连接)
  125. """
  126. return self.is_service_active() and self.is_port_available()
  127. def start_service_sync(self) -> bool:
  128. """同步启动服务,等待端口可用"""
  129. # 如果服务已完全就绪,无需启动
  130. if self.is_service_ready():
  131. logger.debug("[MinerU管理器] 服务已就绪,无需启动")
  132. return True
  133. # 设置启动中标志,防止定时任务在启动过程中停止服务
  134. with self._starting_lock:
  135. self._starting = True
  136. try:
  137. # 如果 systemd 状态是 active 但端口不可用,等待端口
  138. if self.is_service_active():
  139. logger.info("[MinerU管理器] 服务已启动,等待端口就绪...")
  140. else:
  141. logger.info("[MinerU管理器] 正在启动 MinerU 服务...")
  142. success, output = self._run_systemctl("start")
  143. if not success:
  144. logger.error(f"[MinerU管理器] 服务启动失败: {output}")
  145. return False
  146. # 等待服务完全启动(systemd active 且端口可连接)
  147. start_time = time.time()
  148. check_interval = 2 # 每 2 秒检查一次
  149. while time.time() - start_time < SERVICE_START_TIMEOUT:
  150. if self.is_service_ready():
  151. elapsed = time.time() - start_time
  152. logger.info(f"[MinerU管理器] 服务启动成功,端口已就绪(等待 {elapsed:.1f}s)")
  153. return True
  154. # 检查服务是否意外停止
  155. if not self.is_service_active():
  156. logger.error("[MinerU管理器] 服务启动后意外停止")
  157. return False
  158. elapsed = time.time() - start_time
  159. logger.debug(f"[MinerU管理器] 等待端口就绪... ({elapsed:.0f}s/{SERVICE_START_TIMEOUT}s)")
  160. time.sleep(check_interval)
  161. logger.warning(f"[MinerU管理器] 服务启动超时({SERVICE_START_TIMEOUT}s),端口仍不可用")
  162. return False
  163. finally:
  164. # 清除启动中标志
  165. with self._starting_lock:
  166. self._starting = False
  167. async def start_service(self) -> bool:
  168. """异步启动服务"""
  169. async with self._start_lock:
  170. # 在线程池中执行同步操作
  171. loop = asyncio.get_event_loop()
  172. return await loop.run_in_executor(None, self.start_service_sync)
  173. def stop_service_sync(self) -> bool:
  174. """同步停止服务"""
  175. if not self.is_service_active():
  176. logger.debug("[MinerU管理器] 服务未运行,无需停止")
  177. return True
  178. # 检查是否正在启动中
  179. with self._starting_lock:
  180. if self._starting:
  181. logger.warning("[MinerU管理器] 服务正在启动中,不能停止")
  182. return False
  183. # 检查是否有活跃任务
  184. with self._tasks_lock:
  185. if self._active_tasks > 0:
  186. logger.warning(f"[MinerU管理器] 当前有 {self._active_tasks} 个活跃任务,不能停止服务")
  187. return False
  188. logger.info("[MinerU管理器] 正在停止 MinerU 服务以释放 GPU 显存...")
  189. success, output = self._run_systemctl("stop")
  190. if success:
  191. logger.info("[MinerU管理器] 服务已停止,GPU 显存已释放")
  192. return True
  193. else:
  194. logger.error(f"[MinerU管理器] 服务停止失败: {output}")
  195. return False
  196. async def stop_service(self) -> bool:
  197. """异步停止服务"""
  198. loop = asyncio.get_event_loop()
  199. return await loop.run_in_executor(None, self.stop_service_sync)
  200. def task_started(self):
  201. """标记一个 OCR 任务开始"""
  202. with self._tasks_lock:
  203. self._active_tasks += 1
  204. logger.info(f"[MinerU管理器] 任务开始,当前活跃任务数: {self._active_tasks}")
  205. def task_ended(self):
  206. """标记一个 OCR 任务结束"""
  207. with self._tasks_lock:
  208. self._active_tasks = max(0, self._active_tasks - 1)
  209. self._last_task_end_time = datetime.now()
  210. logger.info(f"[MinerU管理器] 任务结束,当前活跃任务数: {self._active_tasks}")
  211. def get_active_task_count(self) -> int:
  212. """获取当前活跃任务数"""
  213. with self._tasks_lock:
  214. return self._active_tasks
  215. def _monitor_loop(self):
  216. """定时监控循环(在单独线程中运行)"""
  217. logger.info("[MinerU管理器] 定时监控线程已启动")
  218. while not self._stop_monitor.is_set():
  219. try:
  220. # 检查是否需要停止服务
  221. self._check_and_stop_if_idle()
  222. except Exception as e:
  223. logger.exception(f"[MinerU管理器] 监控检查异常: {e}")
  224. # 等待下一次检查
  225. self._stop_monitor.wait(CHECK_INTERVAL_SECONDS)
  226. logger.info("[MinerU管理器] 定时监控线程已停止")
  227. def _check_and_stop_if_idle(self):
  228. """检查是否空闲,如果空闲则停止服务"""
  229. with self._tasks_lock:
  230. active_tasks = self._active_tasks
  231. last_end_time = self._last_task_end_time
  232. # 如果有活跃任务,不停止
  233. if active_tasks > 0:
  234. logger.debug(f"[MinerU管理器] 当前有 {active_tasks} 个活跃任务,保持服务运行")
  235. return
  236. # 如果服务未运行,无需处理
  237. if not self.is_service_active():
  238. logger.debug("[MinerU管理器] 服务未运行,跳过检查")
  239. return
  240. # 检查空闲时间
  241. # 如果从未有任务完成,使用管理器初始化时间作为参考
  242. reference_time = last_end_time if last_end_time is not None else self._init_time
  243. idle_seconds = (datetime.now() - reference_time).total_seconds()
  244. if idle_seconds >= IDLE_TIMEOUT_SECONDS:
  245. if last_end_time is None:
  246. logger.info(f"[MinerU管理器] 服务启动后无任务,已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
  247. else:
  248. logger.info(f"[MinerU管理器] 服务已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
  249. self.stop_service_sync()
  250. else:
  251. remaining = IDLE_TIMEOUT_SECONDS - idle_seconds
  252. logger.debug(f"[MinerU管理器] 服务空闲 {idle_seconds:.0f}s,还需 {remaining:.0f}s 达到停止阈值")
  253. def start_monitor(self):
  254. """启动定时监控线程"""
  255. if self._monitor_thread is not None and self._monitor_thread.is_alive():
  256. logger.debug("[MinerU管理器] 监控线程已在运行")
  257. return
  258. self._stop_monitor.clear()
  259. self._monitor_thread = threading.Thread(
  260. target=self._monitor_loop,
  261. name="MinerUMonitor",
  262. daemon=True
  263. )
  264. self._monitor_thread.start()
  265. logger.info("[MinerU管理器] 定时监控已启动")
  266. def stop_monitor(self):
  267. """停止定时监控线程"""
  268. if self._monitor_thread is None:
  269. return
  270. self._stop_monitor.set()
  271. self._monitor_thread.join(timeout=5)
  272. self._monitor_thread = None
  273. logger.info("[MinerU管理器] 定时监控已停止")
  274. def get_status(self) -> dict:
  275. """获取管理器状态"""
  276. with self._tasks_lock:
  277. active_tasks = self._active_tasks
  278. last_end_time = self._last_task_end_time
  279. idle_seconds = None
  280. if last_end_time is not None:
  281. idle_seconds = (datetime.now() - last_end_time).total_seconds()
  282. service_active = self.is_service_active()
  283. port_available = self.is_port_available() if service_active else False
  284. with self._starting_lock:
  285. is_starting = self._starting
  286. return {
  287. "service_name": MINERU_SERVICE_NAME,
  288. "service_active": service_active,
  289. "service_starting": is_starting,
  290. "port_available": port_available,
  291. "service_ready": service_active and port_available,
  292. "api_endpoint": f"http://{MINERU_API_HOST}:{MINERU_API_PORT}",
  293. "active_tasks": active_tasks,
  294. "last_task_end_time": last_end_time.isoformat() if last_end_time else None,
  295. "idle_seconds": idle_seconds,
  296. "idle_timeout_seconds": IDLE_TIMEOUT_SECONDS,
  297. "check_interval_seconds": CHECK_INTERVAL_SECONDS,
  298. "service_start_timeout": SERVICE_START_TIMEOUT,
  299. "monitor_running": self._monitor_thread is not None and self._monitor_thread.is_alive()
  300. }
  301. # 全局单例
  302. _manager: Optional[MinerUServiceManager] = None
  303. def get_mineru_manager() -> MinerUServiceManager:
  304. """获取 MinerU 服务管理器单例"""
  305. global _manager
  306. if _manager is None:
  307. _manager = MinerUServiceManager()
  308. return _manager
  309. async def ensure_mineru_service_running() -> bool:
  310. """
  311. 确保 MinerU 服务正在运行
  312. 在调用 OCR API 前调用此函数
  313. """
  314. manager = get_mineru_manager()
  315. return await manager.start_service()