| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- # Copyright (c) Opendatalab. All rights reserved.
- """
- MinerU 服务管理模块
- 负责:
- 1. 自动启动/停止 mineru-api.service 以释放 GPU 显存
- 2. 检测 OCR 任务状态
- 3. 定时检查空闲并停止服务
- """
- import asyncio
- import os
- import socket
- import subprocess
- import time
- import threading
- from typing import Optional
- from datetime import datetime
- from .logging_config import get_logger
- logger = get_logger("pdf_converter_v2.mineru_manager")
- # 服务名称
- MINERU_SERVICE_NAME = "mineru-api.service"
- # MinerU API 地址和端口(用于健康检查)
- MINERU_API_HOST = os.getenv("MINERU_API_HOST", "127.0.0.1")
- MINERU_API_PORT = int(os.getenv("MINERU_API_PORT", "5282"))
- # 空闲超时时间(秒),超过此时间无任务则停止服务
- IDLE_TIMEOUT_SECONDS = int(os.getenv("MINERU_IDLE_TIMEOUT", "60")) # 默认 1 分钟
- # 检查间隔(秒)
- CHECK_INTERVAL_SECONDS = int(os.getenv("MINERU_CHECK_INTERVAL", "60")) # 默认 1 分钟
- # 服务启动等待超时(秒)
- SERVICE_START_TIMEOUT = int(os.getenv("MINERU_START_TIMEOUT", "120")) # 默认 2 分钟
- class MinerUServiceManager:
- """MinerU 服务管理器(单例模式)"""
-
- _instance: Optional["MinerUServiceManager"] = None
- _lock = threading.Lock()
-
- def __new__(cls) -> "MinerUServiceManager":
- if cls._instance is None:
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._initialized = False
- return cls._instance
-
- def __init__(self):
- if self._initialized:
- return
- self._initialized = True
-
- # 当前活跃的 OCR 任务计数
- self._active_tasks = 0
- self._tasks_lock = threading.Lock()
-
- # 服务正在启动中的标志(防止启动过程中被停止)
- self._starting = False
- self._starting_lock = threading.Lock()
-
- # 最后一次任务完成时间(或管理器启动时间,用于计算空闲)
- self._last_task_end_time: Optional[datetime] = None
-
- # 管理器初始化时间(用于首次空闲检测)
- self._init_time: datetime = datetime.now()
-
- # 定时检查线程
- self._monitor_thread: Optional[threading.Thread] = None
- self._stop_monitor = threading.Event()
-
- # 服务启动锁,避免并发启动
- self._start_lock = asyncio.Lock()
-
- logger.info(f"[MinerU管理器] 初始化完成,空闲超时: {IDLE_TIMEOUT_SECONDS}s, 检查间隔: {CHECK_INTERVAL_SECONDS}s")
-
- def _run_systemctl(self, action: str) -> tuple[bool, str]:
- """
- 执行 systemctl 命令
-
- Args:
- action: start, stop, status, is-active
-
- Returns:
- (success, output)
- """
- cmd = ["systemctl", action, MINERU_SERVICE_NAME]
- try:
- result = subprocess.run(
- cmd,
- capture_output=True,
- text=True,
- timeout=30
- )
- output = result.stdout.strip() or result.stderr.strip()
- success = result.returncode == 0
- return success, output
- except subprocess.TimeoutExpired:
- logger.error(f"[MinerU管理器] systemctl {action} 超时")
- return False, "timeout"
- except Exception as e:
- logger.error(f"[MinerU管理器] systemctl {action} 失败: {e}")
- return False, str(e)
-
- def is_service_active(self) -> bool:
- """检查服务是否正在运行(systemd 状态)"""
- success, output = self._run_systemctl("is-active")
- is_active = success and output == "active"
- logger.debug(f"[MinerU管理器] 服务状态: {output} (active={is_active})")
- return is_active
-
- def is_port_available(self, host: str = None, port: int = None, timeout: float = 2.0) -> bool:
- """
- 检查 MinerU API 端口是否可连接
-
- Args:
- host: 主机地址,默认使用 MINERU_API_HOST
- port: 端口号,默认使用 MINERU_API_PORT
- timeout: 连接超时时间(秒)
-
- Returns:
- True 如果端口可连接
- """
- host = host or MINERU_API_HOST
- port = port or MINERU_API_PORT
-
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(timeout)
- result = sock.connect_ex((host, port))
- sock.close()
- is_available = result == 0
- logger.debug(f"[MinerU管理器] 端口检测 {host}:{port} -> {'可用' if is_available else '不可用'}")
- return is_available
- except Exception as e:
- logger.debug(f"[MinerU管理器] 端口检测失败: {e}")
- return False
-
- def is_service_ready(self) -> bool:
- """
- 检查服务是否完全就绪(systemd active 且端口可连接)
- """
- return self.is_service_active() and self.is_port_available()
-
- def start_service_sync(self) -> bool:
- """同步启动服务,等待端口可用"""
- # 如果服务已完全就绪,无需启动
- if self.is_service_ready():
- logger.debug("[MinerU管理器] 服务已就绪,无需启动")
- return True
-
- # 设置启动中标志,防止定时任务在启动过程中停止服务
- with self._starting_lock:
- self._starting = True
-
- try:
- # 如果 systemd 状态是 active 但端口不可用,等待端口
- if self.is_service_active():
- logger.info("[MinerU管理器] 服务已启动,等待端口就绪...")
- else:
- logger.info("[MinerU管理器] 正在启动 MinerU 服务...")
- success, output = self._run_systemctl("start")
- if not success:
- logger.error(f"[MinerU管理器] 服务启动失败: {output}")
- return False
-
- # 等待服务完全启动(systemd active 且端口可连接)
- start_time = time.time()
- check_interval = 2 # 每 2 秒检查一次
-
- while time.time() - start_time < SERVICE_START_TIMEOUT:
- if self.is_service_ready():
- elapsed = time.time() - start_time
- logger.info(f"[MinerU管理器] 服务启动成功,端口已就绪(等待 {elapsed:.1f}s)")
- return True
-
- # 检查服务是否意外停止
- if not self.is_service_active():
- logger.error("[MinerU管理器] 服务启动后意外停止")
- return False
-
- elapsed = time.time() - start_time
- logger.debug(f"[MinerU管理器] 等待端口就绪... ({elapsed:.0f}s/{SERVICE_START_TIMEOUT}s)")
- time.sleep(check_interval)
-
- logger.warning(f"[MinerU管理器] 服务启动超时({SERVICE_START_TIMEOUT}s),端口仍不可用")
- return False
- finally:
- # 清除启动中标志
- with self._starting_lock:
- self._starting = False
-
- async def start_service(self) -> bool:
- """异步启动服务"""
- async with self._start_lock:
- # 在线程池中执行同步操作
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, self.start_service_sync)
-
- def stop_service_sync(self) -> bool:
- """同步停止服务"""
- if not self.is_service_active():
- logger.debug("[MinerU管理器] 服务未运行,无需停止")
- return True
-
- # 检查是否正在启动中
- with self._starting_lock:
- if self._starting:
- logger.warning("[MinerU管理器] 服务正在启动中,不能停止")
- return False
-
- # 检查是否有活跃任务
- with self._tasks_lock:
- if self._active_tasks > 0:
- logger.warning(f"[MinerU管理器] 当前有 {self._active_tasks} 个活跃任务,不能停止服务")
- return False
-
- logger.info("[MinerU管理器] 正在停止 MinerU 服务以释放 GPU 显存...")
- success, output = self._run_systemctl("stop")
-
- if success:
- logger.info("[MinerU管理器] 服务已停止,GPU 显存已释放")
- return True
- else:
- logger.error(f"[MinerU管理器] 服务停止失败: {output}")
- return False
-
- async def stop_service(self) -> bool:
- """异步停止服务"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, self.stop_service_sync)
-
- def task_started(self):
- """标记一个 OCR 任务开始"""
- with self._tasks_lock:
- self._active_tasks += 1
- logger.info(f"[MinerU管理器] 任务开始,当前活跃任务数: {self._active_tasks}")
-
- def task_ended(self):
- """标记一个 OCR 任务结束"""
- with self._tasks_lock:
- self._active_tasks = max(0, self._active_tasks - 1)
- self._last_task_end_time = datetime.now()
- logger.info(f"[MinerU管理器] 任务结束,当前活跃任务数: {self._active_tasks}")
-
- def get_active_task_count(self) -> int:
- """获取当前活跃任务数"""
- with self._tasks_lock:
- return self._active_tasks
-
- def _monitor_loop(self):
- """定时监控循环(在单独线程中运行)"""
- logger.info("[MinerU管理器] 定时监控线程已启动")
-
- while not self._stop_monitor.is_set():
- try:
- # 检查是否需要停止服务
- self._check_and_stop_if_idle()
- except Exception as e:
- logger.exception(f"[MinerU管理器] 监控检查异常: {e}")
-
- # 等待下一次检查
- self._stop_monitor.wait(CHECK_INTERVAL_SECONDS)
-
- logger.info("[MinerU管理器] 定时监控线程已停止")
-
- def _check_and_stop_if_idle(self):
- """检查是否空闲,如果空闲则停止服务"""
- with self._tasks_lock:
- active_tasks = self._active_tasks
- last_end_time = self._last_task_end_time
-
- # 如果有活跃任务,不停止
- if active_tasks > 0:
- logger.debug(f"[MinerU管理器] 当前有 {active_tasks} 个活跃任务,保持服务运行")
- return
-
- # 如果服务未运行,无需处理
- if not self.is_service_active():
- logger.debug("[MinerU管理器] 服务未运行,跳过检查")
- return
-
- # 检查空闲时间
- # 如果从未有任务完成,使用管理器初始化时间作为参考
- reference_time = last_end_time if last_end_time is not None else self._init_time
- idle_seconds = (datetime.now() - reference_time).total_seconds()
-
- if idle_seconds >= IDLE_TIMEOUT_SECONDS:
- if last_end_time is None:
- logger.info(f"[MinerU管理器] 服务启动后无任务,已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
- else:
- logger.info(f"[MinerU管理器] 服务已空闲 {idle_seconds:.0f}s,超过阈值 {IDLE_TIMEOUT_SECONDS}s,准备停止")
- self.stop_service_sync()
- else:
- remaining = IDLE_TIMEOUT_SECONDS - idle_seconds
- logger.debug(f"[MinerU管理器] 服务空闲 {idle_seconds:.0f}s,还需 {remaining:.0f}s 达到停止阈值")
-
- def start_monitor(self):
- """启动定时监控线程"""
- if self._monitor_thread is not None and self._monitor_thread.is_alive():
- logger.debug("[MinerU管理器] 监控线程已在运行")
- return
-
- self._stop_monitor.clear()
- self._monitor_thread = threading.Thread(
- target=self._monitor_loop,
- name="MinerUMonitor",
- daemon=True
- )
- self._monitor_thread.start()
- logger.info("[MinerU管理器] 定时监控已启动")
-
- def stop_monitor(self):
- """停止定时监控线程"""
- if self._monitor_thread is None:
- return
-
- self._stop_monitor.set()
- self._monitor_thread.join(timeout=5)
- self._monitor_thread = None
- logger.info("[MinerU管理器] 定时监控已停止")
-
- def get_status(self) -> dict:
- """获取管理器状态"""
- with self._tasks_lock:
- active_tasks = self._active_tasks
- last_end_time = self._last_task_end_time
-
- idle_seconds = None
- if last_end_time is not None:
- idle_seconds = (datetime.now() - last_end_time).total_seconds()
-
- service_active = self.is_service_active()
- port_available = self.is_port_available() if service_active else False
-
- with self._starting_lock:
- is_starting = self._starting
-
- return {
- "service_name": MINERU_SERVICE_NAME,
- "service_active": service_active,
- "service_starting": is_starting,
- "port_available": port_available,
- "service_ready": service_active and port_available,
- "api_endpoint": f"http://{MINERU_API_HOST}:{MINERU_API_PORT}",
- "active_tasks": active_tasks,
- "last_task_end_time": last_end_time.isoformat() if last_end_time else None,
- "idle_seconds": idle_seconds,
- "idle_timeout_seconds": IDLE_TIMEOUT_SECONDS,
- "check_interval_seconds": CHECK_INTERVAL_SECONDS,
- "service_start_timeout": SERVICE_START_TIMEOUT,
- "monitor_running": self._monitor_thread is not None and self._monitor_thread.is_alive()
- }
- # 全局单例
- _manager: Optional[MinerUServiceManager] = None
- def get_mineru_manager() -> MinerUServiceManager:
- """获取 MinerU 服务管理器单例"""
- global _manager
- if _manager is None:
- _manager = MinerUServiceManager()
- return _manager
- async def ensure_mineru_service_running() -> bool:
- """
- 确保 MinerU 服务正在运行
-
- 在调用 OCR API 前调用此函数
- """
- manager = get_mineru_manager()
- return await manager.start_service()
|