Pārlūkot izejas kodu

pdf_converter_v2: GPU/NPU 采集适配、Paddle/MinerU 多卡单任务用满

- gpu_monitor: 自动识别 nvi/npu,NPU 用 npu-smi 采集显存与利用率
- resource_monitor: 使用 gpu_monitor.get_gpu_info() 统一采集
- mineru_url_selector: 多 MinerU URL 解析与轮询;单任务按页拆分并行发往各实例
- paddleocr_fallback: PADDLE_OCR_DEVICES 多卡轮询;get_paddle_ocr_devices/get_paddle_ocr_device_args_for_index 供多卡并行
- processor/converter: MinerU 多卡按页拆分 asyncio.gather 并行合并;Paddle 多卡按页拆分并行 doc_parser 合并
- README: 多卡单任务用满说明与环境变量表
何文松 2 nedēļas atpakaļ
vecāks
revīzija
554cf82e2b

+ 13 - 5
pdf_converter_v2/README.md

@@ -295,7 +295,7 @@ sudo journalctl -u pdf-converter-v2 -f
 ### 环境变量配置
 
 主要环境变量:
-- `API_URL`: 外部API地址(默认: http://127.0.0.1:5282)
+- `API_URL`: MinerU 外部 API 地址;多卡多实例时可写逗号分隔多地址,请求轮询使用(默认: http://127.0.0.1:5282)
 - `API_HOST`: 服务监听地址(默认: 0.0.0.0)
 - `API_PORT`: 服务监听端口(默认: 4214)
 - `LOG_LEVEL`: 日志级别(默认: info)
@@ -438,14 +438,22 @@ export MINERU_PORT=5283
 sh pdf_converter_v2/scripts/start_mineru_in_container.sh
 ```
 
-pdf_converter_v2 API 默认只连一个 MinerU 地址(如 `API_URL=http://127.0.0.1:5282`)。若要轮询多实例,需在应用层或反向代理(如 Nginx)做负载均衡,或后续在 pdf_converter_v2 中支持多 MinerU 地址配置。
+pdf_converter_v2 支持 MinerU 多卡单任务用满:将 `API_URL` 设为逗号分隔的多个地址时,**单次 PDF 转换**会按页拆成 N 段,并行发往 N 个 MinerU 实例,再合并结果(单任务用满所有卡)。例如:
+`API_URL=http://127.0.0.1:5282,http://127.0.0.1:5283`。单地址时行为不变。
 
-### 4. 小结
+### 4. 多卡轮询(PaddleOCR)
+
+PaddleOCR 多卡时,将 `PADDLE_OCR_DEVICES` 设为逗号分隔的设备列表。**单次 PDF 转换**会按页拆成 N 段,并行在 N 张卡上跑 doc_parser,再合并结果(单任务用满所有卡)。例如:
+`PADDLE_OCR_DEVICES=npu:0,npu:1`。未设置时仍使用单卡 `PADDLE_OCR_DEVICE` 或 NPU 下默认 `npu:0`。
+
+### 5. 小结
 
 | 组件 | 环境变量 | 示例 |
 |------|----------|------|
-| MinerU | `MINERU_DEVICE_MODE` | `npu`、`npu:0`、`npu:1` |
-| PaddleOCR | `PADDLE_OCR_DEVICE` | `npu:0`、`npu:1` |
+| MinerU(多卡单任务用满) | `API_URL` | 单地址:`http://127.0.0.1:5282`;多地址:按页拆分并行发往各实例 |
+| MinerU 单实例设备 | `MINERU_DEVICE_MODE` | `npu`、`npu:0`、`npu:1` |
+| PaddleOCR 单卡 | `PADDLE_OCR_DEVICE` | `npu:0`、`npu:1` |
+| PaddleOCR(多卡单任务用满) | `PADDLE_OCR_DEVICES` | `npu:0,npu:1`:按页拆分并行跑满各卡 |
 | 昇腾可见卡 | `ASCEND_RT_VISIBLE_DEVICES` | `0`、`1,2`(物理卡号) |
 
 ## 更新说明

+ 3 - 4
pdf_converter_v2/api/main.py

@@ -457,7 +457,7 @@ async def process_conversion_task(
                     table_enable=True,
                     language=DEFAULT_LANGUAGE,
                     backend=DEFAULT_BACKEND,
-                    url=DEFAULT_API_URL,
+                    url=None,
                     embed_images=False,
                     output_json=True,
                     start_page_id=DEFAULT_START_PAGE_ID,
@@ -491,7 +491,7 @@ async def process_conversion_task(
                 table_enable=True,
                 language=DEFAULT_LANGUAGE,
                 backend=DEFAULT_BACKEND,
-                url=DEFAULT_API_URL,
+                url=None,
                 # v2: 固定为 False
                 embed_images=False,
                 output_json=True,
@@ -597,12 +597,11 @@ async def process_pdf_to_markdown_task(
             else:
                 logger.warning(f"[任务 {task_id}] 页眉页脚裁剪失败,使用原文件继续")
 
-        api_url = os.getenv("API_URL", "http://127.0.0.1:5282")
         result = await convert_pdf_to_markdown_only(
             input_file=current_path,
             output_dir=output_dir,
             backend=backend,
-            url=api_url,
+            url=None,
             return_images=return_images,
         )
         if not result:

+ 159 - 48
pdf_converter_v2/processor/converter.py

@@ -19,6 +19,12 @@ from PIL import Image
 from ..utils.logging_config import get_logger
 from ..utils.file_utils import safe_stem
 from ..utils.pdf_splitter import get_pdf_page_count, split_pdf_by_pages
+from ..utils.mineru_url_selector import get_next_mineru_api_url, get_mineru_api_url_list
+from ..utils.paddleocr_fallback import (
+    get_paddle_ocr_devices,
+    get_paddle_ocr_device_args_for_index,
+    _paddle_ocr_device_args,
+)
 
 logger = get_logger("pdf_converter_v2.processor")
 PADDLE_CMD = os.getenv("PADDLE_DOC_PARSER_CMD", "paddleocr")
@@ -42,6 +48,26 @@ async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
     return process.returncode, stdout, stderr
 
 
+def _paddle_base_cmd(input_path: str, save_path_base: str, device_args: list) -> list:
+    """构建 PaddleOCR doc_parser 命令(含设备参数)。"""
+    return [
+        PADDLE_CMD,
+        "doc_parser",
+        "-i",
+        input_path,
+        "--precision",
+        "fp32",
+        "--use_doc_unwarping",
+        "False",
+        "--use_doc_orientation_classify",
+        "True",
+        "--use_chart_recognition",
+        "True",
+        "--save_path",
+        save_path_base,
+    ] + device_args
+
+
 async def _convert_with_paddle(
     input_file: str,
     output_dir: str,
@@ -49,73 +75,109 @@ async def _convert_with_paddle(
     output_json: bool,
     forced_document_type: Optional[str],
 ):
-    """针对工况附件使用 PaddleOCR doc_parser 直接转换"""
+    """针对工况附件使用 PaddleOCR doc_parser 直接转换;多卡时按页拆分并行跑满所有卡。"""
     if not os.path.exists(input_file):
         logger.error(f"[Paddle] 输入文件不存在: {input_file}")
         return None
-    
+
     file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
     os.makedirs(output_dir, exist_ok=True)
-    
     temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
     logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
-    save_path_base = os.path.join(temp_dir, Path(input_file).stem)
-    os.makedirs(save_path_base, exist_ok=True)
-    
-    cmd = [
-        PADDLE_CMD,
-        "doc_parser",
-        "-i",
-        input_file,
-        "--precision",
-        "fp32",
-        "--use_doc_unwarping",
-        "False",
-        "--use_doc_orientation_classify",
-        "True",
-        "--use_chart_recognition",
-        "True",
-        "--save_path",
-        save_path_base,
-    ]
-    
+
+    devices = get_paddle_ocr_devices()
+    ext = (Path(input_file).suffix or "").lower()
+    page_count = get_pdf_page_count(input_file) if ext == ".pdf" else 0
+    use_multi_card = len(devices) > 1 and ext == ".pdf" and page_count > 1
+
     try:
-        return_code, _, stderr = await _run_paddle_doc_parser(cmd)
-        if return_code != 0:
-            logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
-            if stderr:
-                logger.error(stderr)
-            return None
-        
-        md_files = sorted(Path(save_path_base).rglob("*.md"))
-        if not md_files:
-            logger.error("[Paddle] 未找到Markdown文件")
+        if use_multi_card:
+            # 多卡:按页拆成 N 段,每段用一张卡并行 doc_parser,再合并
+            chunk_size = (page_count + len(devices) - 1) // len(devices)
+            chunks_dir = os.path.join(temp_dir, "chunks")
+            os.makedirs(chunks_dir, exist_ok=True)
+            chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
+            if not chunk_paths:
+                logger.error("[Paddle] 多卡拆分 PDF 失败")
+                return None
+            logger.info(f"[Paddle] PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行使用 {len(devices)} 张卡")
+            tasks = []
+            for i, chunk_path in enumerate(chunk_paths):
+                save_path_base_i = os.path.join(temp_dir, f"out_{i}", Path(chunk_path).stem)
+                os.makedirs(save_path_base_i, exist_ok=True)
+                cmd = _paddle_base_cmd(
+                    chunk_path,
+                    save_path_base_i,
+                    get_paddle_ocr_device_args_for_index(i),
+                )
+                tasks.append(_run_paddle_doc_parser(cmd))
+            results = await asyncio.gather(*tasks, return_exceptions=True)
+            markdown_parts = []
+            all_save_bases = [os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem) for i in range(len(chunk_paths))]
+            for i, res in enumerate(results):
+                if isinstance(res, Exception):
+                    logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 异常: {res}")
+                    continue
+                ret_code, _, stderr = res
+                if ret_code != 0:
+                    logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 失败: {stderr}")
+                    continue
+                base = all_save_bases[i]
+                md_files = sorted(Path(base).rglob("*.md"))
+                for md_file in md_files:
+                    async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
+                        markdown_parts.append(await f.read())
+            final_content = "\n\n".join(markdown_parts) if markdown_parts else ""
+        else:
+            # 单卡或非 PDF:一次 doc_parser
+            save_path_base = os.path.join(temp_dir, Path(input_file).stem)
+            os.makedirs(save_path_base, exist_ok=True)
+            cmd = _paddle_base_cmd(input_file, save_path_base, _paddle_ocr_device_args())
+            return_code, _, stderr = await _run_paddle_doc_parser(cmd)
+            if return_code != 0:
+                logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
+                if stderr:
+                    logger.error(stderr)
+                return None
+            md_files = sorted(Path(save_path_base).rglob("*.md"))
+            if not md_files:
+                logger.error("[Paddle] 未找到Markdown文件")
+                return None
+            markdown_parts = []
+            for md_file in md_files:
+                async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
+                    markdown_parts.append(await f.read())
+            final_content = "\n\n".join(markdown_parts)
+
+        if not final_content:
+            logger.error("[Paddle] 合并后无内容")
             return None
-        
-        markdown_parts = []
-        for md_file in md_files:
-            async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
-                markdown_parts.append(await f.read())
-        final_content = "\n\n".join(markdown_parts)
+
         logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
-        
         local_md_dir = os.path.join(output_dir, file_name, "markdown")
         os.makedirs(local_md_dir, exist_ok=True)
         md_path = os.path.join(local_md_dir, f"{file_name}.md")
         async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
             await f.write(final_content)
-        
         output_md_path = os.path.join(output_dir, f"{file_name}.md")
         async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
             await f.write(final_content)
-        
+
         if embed_images:
             local_image_dir = os.path.join(output_dir, file_name, "images")
             os.makedirs(local_image_dir, exist_ok=True)
-            for asset in Path(save_path_base).rglob("*"):
-                if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
-                    shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
-        
+            if use_multi_card:
+                for i in range(len(chunk_paths)):
+                    base = os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem)
+                    for asset in Path(base).rglob("*"):
+                        if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
+                            shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
+            else:
+                base = os.path.join(temp_dir, Path(input_file).stem)
+                for asset in Path(base).rglob("*"):
+                    if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
+                        shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
+
         json_data = None
         json_path = None
         if output_json:
@@ -135,7 +197,7 @@ async def _convert_with_paddle(
                     await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
             except Exception as exc:
                 logger.exception(f"[Paddle] JSON转换失败: {exc}")
-        
+
         return {
             "markdown_file": output_md_path,
             "json_file": json_path,
@@ -178,6 +240,8 @@ async def convert_to_markdown(
         logger.error(f"输入文件不存在: {input_file}")
         return None
 
+    url = url or get_next_mineru_api_url()
+
     # 生成文件名
     file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
     
@@ -458,7 +522,7 @@ async def convert_pdf_to_markdown_only(
         return None
 
     ext = (Path(input_file).suffix or "").lower()
-    url = url or os.getenv("API_URL", "http://127.0.0.1:5282")
+    url = url or get_next_mineru_api_url()
 
     # 仅对 PDF 做按页切割;图片或页数不足则单次转换
     if ext == ".pdf":
@@ -466,6 +530,53 @@ async def convert_pdf_to_markdown_only(
         if page_count <= 0:
             logger.error(f"无法获取 PDF 页数: {input_file}")
             return None
+
+        # MinerU 多卡:按页拆成 N 段,每段发到不同 API 实例并行转换后合并(单任务用满所有卡)
+        url_list = get_mineru_api_url_list()
+        if backend != "paddle" and len(url_list) > 1 and page_count > 1:
+            chunk_size = (page_count + len(url_list) - 1) // len(url_list)
+            chunks_dir = tempfile.mkdtemp(prefix="pdf_multi_card_", dir=output_dir)
+            try:
+                chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
+                if not chunk_paths:
+                    return None
+                logger.info(f"PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行发往 {len(url_list)} 个 MinerU 实例")
+                tasks = []
+                for i, chunk_path in enumerate(chunk_paths):
+                    chunk_out = os.path.join(chunks_dir, f"out_{i}")
+                    os.makedirs(chunk_out, exist_ok=True)
+                    tasks.append(
+                        convert_to_markdown(
+                            input_file=chunk_path,
+                            output_dir=chunk_out,
+                            max_pages=max_pages,
+                            output_json=False,
+                            formula_enable=formula_enable,
+                            table_enable=table_enable,
+                            language=language,
+                            url=url_list[i % len(url_list)],
+                            embed_images=return_images,
+                        )
+                    )
+                results = await asyncio.gather(*tasks, return_exceptions=True)
+                parts = []
+                for i, r in enumerate(results):
+                    if isinstance(r, Exception):
+                        logger.warning(f"第 {i + 1} 段 MinerU 转换异常: {r}")
+                        continue
+                    if r and r.get("content"):
+                        parts.append(r["content"])
+                if not parts:
+                    return None
+                merged = "\n\n".join(parts)
+                filename = Path(input_file).stem + ".md"
+                return {"markdown": merged, "filename": filename}
+            finally:
+                try:
+                    shutil.rmtree(chunks_dir, ignore_errors=True)
+                except Exception as e:
+                    logger.debug(f"清理多卡临时目录失败: {e}")
+
         if page_count > PDF_CHUNK_PAGES:
             chunks_dir = tempfile.mkdtemp(prefix="pdf_chunks_", dir=output_dir)
             try:

+ 82 - 28
pdf_converter_v2/utils/gpu_monitor.py

@@ -1,8 +1,9 @@
 """
-GPU监控工具模块
-用于获取和计算GPU使用情况
+GPU/NPU 监控工具模块
+根据运行环境自动识别 NVIDIA GPU 或华为昇腾 NPU,采集显存与利用率等使用情况。
 """
 
+import re
 import subprocess
 import logging
 from typing import Optional, Dict, Any
@@ -10,21 +11,9 @@ from typing import Optional, Dict, Any
 logger = logging.getLogger(__name__)
 
 
-def get_gpu_info() -> Optional[Dict[str, Any]]:
-    """
-    获取GPU信息(使用nvidia-smi)
-    
-    Returns:
-        GPU信息字典,包含:
-        - gpu_index: GPU索引
-        - gpu_memory_used: 已使用显存(字节)
-        - gpu_utilization: GPU利用率(%)
-        - gpu_memory_total: 总显存(字节)
-        - gpu_name: GPU名称
-        如果获取失败返回None
-    """
+def _get_nvidia_gpu_info() -> Optional[Dict[str, Any]]:
+    """通过 nvidia-smi 获取 NVIDIA GPU 信息(统一返回格式)。"""
     try:
-        # 执行nvidia-smi命令
         cmd = [
             "nvidia-smi",
             "--query-gpu=index,name,memory.total,memory.used,utilization.gpu",
@@ -37,40 +26,105 @@ def get_gpu_info() -> Optional[Dict[str, Any]]:
             timeout=5,
             check=False
         )
-        
         if result.returncode != 0:
-            logger.debug(f"nvidia-smi命令执行失败: {result.stderr}")
+            logger.debug(f"nvidia-smi 执行失败: {result.stderr}")
             return None
-        
-        # 解析输出(取第一个GPU)
         lines = result.stdout.strip().split('\n')
         if not lines or not lines[0]:
-            logger.debug("nvidia-smi未返回GPU信息")
+            logger.debug("nvidia-smi 未返回 GPU 信息")
             return None
-        
         parts = [p.strip() for p in lines[0].split(',')]
         if len(parts) < 5:
-            logger.debug(f"GPU信息格式不正确: {lines[0]}")
+            logger.debug(f"nvidia-smi 输出格式异常: {lines[0]}")
             return None
-        
         gpu_index = int(parts[0])
         gpu_name = parts[1]
         memory_total_mb = int(parts[2])
         memory_used_mb = int(parts[3])
         utilization = float(parts[4])
-        
         return {
             "gpu_index": gpu_index,
             "gpu_name": gpu_name,
-            "gpu_memory_total": memory_total_mb * 1024 * 1024,  # 转换为字节
-            "gpu_memory_used": memory_used_mb * 1024 * 1024,  # 转换为字节
+            "gpu_memory_total": memory_total_mb * 1024 * 1024,
+            "gpu_memory_used": memory_used_mb * 1024 * 1024,
             "gpu_utilization": utilization
         }
     except Exception as e:
-        logger.debug(f"获取GPU信息失败: {e}")
+        logger.debug(f"获取 NVIDIA GPU 信息失败: {e}")
+        return None
+
+
+def _get_npu_info() -> Optional[Dict[str, Any]]:
+    """通过 npu-smi info 获取华为昇腾 NPU 信息(统一返回格式)。
+    解析 AICore(%) 与 Memory-Usage(MB) 行,如: | 0   0  | ... | 0   1154 / 7767 |。
+    """
+    try:
+        result = subprocess.run(
+            ["npu-smi", "info"],
+            capture_output=True,
+            text=True,
+            timeout=5,
+            check=False
+        )
+        if result.returncode != 0:
+            logger.debug(f"npu-smi info 执行失败: {result.stderr}")
+            return None
+        # 匹配行内 "数字  used / total" 形式(AICore% 与 Memory-Usage)
+        # 例如: "| 0         1154 / 7767 |" 或 " 0   1154 / 7767 "
+        pattern = re.compile(r"(\d+)\s+(\d+)\s*/\s*(\d+)")
+        for line in result.stdout.splitlines():
+            m = pattern.search(line)
+            if m:
+                aicore_pct = float(m.group(1))
+                memory_used_mb = int(m.group(2))
+                memory_total_mb = int(m.group(3))
+                return {
+                    "gpu_index": 0,
+                    "gpu_name": "NPU",
+                    "gpu_memory_total": memory_total_mb * 1024 * 1024,
+                    "gpu_memory_used": memory_used_mb * 1024 * 1024,
+                    "gpu_utilization": aicore_pct
+                }
+        logger.debug("npu-smi info 中未解析到 Memory-Usage 行")
+        return None
+    except FileNotFoundError:
+        logger.debug("未找到 npu-smi 命令")
+        return None
+    except Exception as e:
+        logger.debug(f"获取 NPU 信息失败: {e}")
         return None
 
 
+def get_gpu_info() -> Optional[Dict[str, Any]]:
+    """
+    根据当前运行环境自动选择采集方式,获取加速卡(GPU/NPU)信息。
+    优先使用环境变量 PDF_CONVERTER_DEVICE_KIND,否则自动检测 nvidia-smi / npu-smi。
+    
+    Returns:
+        统一格式的字典:
+        - gpu_index: 设备索引
+        - gpu_name: 设备名称(如 GPU 型号或 "NPU")
+        - gpu_memory_total: 总显存(字节)
+        - gpu_memory_used: 已使用显存(字节)
+        - gpu_utilization: 利用率(%)
+        无可用设备或采集失败时返回 None。
+    """
+    from .device_env import detect_device_kind
+    kind = detect_device_kind()
+    if kind == "nvi":
+        return _get_nvidia_gpu_info()
+    if kind == "npu":
+        return _get_npu_info()
+    # cpu 或未知:也可尝试按顺序探测,便于未设置环境变量时仍能采集
+    info = _get_nvidia_gpu_info()
+    if info:
+        return info
+    info = _get_npu_info()
+    if info:
+        return info
+    return None
+
+
 def get_gpu_info_delta(start_gpu_info: Optional[Dict], end_gpu_info: Optional[Dict]) -> Optional[Dict[str, Any]]:
     """
     计算GPU使用增量(OCR任务期间的GPU使用)

+ 55 - 0
pdf_converter_v2/utils/mineru_url_selector.py

@@ -0,0 +1,55 @@
+# Copyright (c) Opendatalab. All rights reserved.
+
+"""
+MinerU API 多实例 URL 轮询
+当 API_URL 配置为逗号分隔的多个地址时(多卡多实例),按请求轮询使用。
+"""
+
+import os
+import threading
+from typing import List
+
+_DEFAULT_SINGLE = "http://127.0.0.1:5282"
+_URL_LIST: List[str] = []
+_URL_INDEX: int = 0
+_URL_LOCK = threading.Lock()
+
+
+def _parse_api_url_list() -> List[str]:
+    """从环境变量 API_URL 解析 URL 列表(逗号分隔,去空格)。"""
+    raw = os.getenv("API_URL", _DEFAULT_SINGLE).strip()
+    if not raw:
+        return [_DEFAULT_SINGLE]
+    return [u.strip() for u in raw.split(",") if u.strip()] or [_DEFAULT_SINGLE]
+
+
+def _get_url_list() -> List[str]:
+    """解析并缓存 API_URL 列表(逗号分隔)。"""
+    global _URL_LIST
+    with _URL_LOCK:
+        if not _URL_LIST:
+            _URL_LIST[:] = _parse_api_url_list()
+        return list(_URL_LIST)
+
+
+def get_mineru_api_url_list() -> List[str]:
+    """
+    返回 MinerU API 地址列表(用于单任务多卡:按页拆分后并行发到各实例)。
+    当 API_URL 为逗号分隔多地址时返回多元素;单地址时返回单元素列表。
+    """
+    return _get_url_list()
+
+
+def get_next_mineru_api_url() -> str:
+    """
+    获取下一次应使用的 MinerU API 地址(线程安全轮询)。
+    当 API_URL 为多个地址(逗号分隔)时轮流返回;单地址则始终返回该地址。
+    """
+    global _URL_INDEX
+    urls = _get_url_list()
+    with _URL_LOCK:
+        if len(urls) == 1:
+            return urls[0]
+        idx = _URL_INDEX % len(urls)
+        _URL_INDEX += 1
+        return urls[idx]

+ 53 - 11
pdf_converter_v2/utils/paddleocr_fallback.py

@@ -55,18 +55,60 @@ def _get_paddleocr_executable() -> str:
     return "paddleocr"
 
 
-# PaddleOCR 推理设备:NPU 环境下需设为 npu 或 npu:0,否则会走 CPU 并可能段错误
-# 通过环境变量 PADDLE_OCR_DEVICE 指定;未设置时根据设备环境自动选择(NPU 下默认 npu:0)
+# PaddleOCR 推理设备:支持单卡与多卡轮询
+# 单卡:PADDLE_OCR_DEVICE=npu:0 或未设置时 NPU 下默认 npu:0
+# 多卡:PADDLE_OCR_DEVICES=npu:0,npu:1 时按请求轮询使用
+import threading as _threading
+_PADDLE_OCR_DEVICES: List[str] = []
+_PADDLE_OCR_DEVICE_INDEX: int = 0
+_PADDLE_OCR_DEVICE_LOCK = _threading.Lock()
+
+
+def _get_paddle_ocr_devices() -> List[str]:
+    """解析 PADDLE_OCR_DEVICES 或 PADDLE_OCR_DEVICE,返回设备列表(惰性、线程安全)。"""
+    global _PADDLE_OCR_DEVICES
+    with _PADDLE_OCR_DEVICE_LOCK:
+        if _PADDLE_OCR_DEVICES:
+            return _PADDLE_OCR_DEVICES
+        multi = os.getenv("PADDLE_OCR_DEVICES", "").strip()
+        if multi:
+            _PADDLE_OCR_DEVICES[:] = [d.strip() for d in multi.split(",") if d.strip()]
+        if not _PADDLE_OCR_DEVICES:
+            single = os.getenv("PADDLE_OCR_DEVICE", "").strip()
+            if not single:
+                from .device_env import is_npu
+                if is_npu():
+                    single = "npu:0"
+            if single:
+                _PADDLE_OCR_DEVICES.append(single)
+    return _PADDLE_OCR_DEVICES
+
+
+def get_paddle_ocr_devices() -> List[str]:
+    """返回 PaddleOCR 设备列表(用于单任务多卡:按页拆分后并行使用各卡)。"""
+    return list(_get_paddle_ocr_devices())
+
+
+def get_paddle_ocr_device_args_for_index(device_index: int) -> list:
+    """返回指定设备索引的 --device 参数列表;用于多卡并行时显式指定每段用哪张卡。"""
+    devices = _get_paddle_ocr_devices()
+    if not devices:
+        return []
+    device = devices[device_index % len(devices)]
+    return ["--device", device]
+
+
 def _paddle_ocr_device_args() -> list:
-    """返回 PaddleOCR 命令的 --device 参数列表(若未设置则返回空列表)"""
-    device = os.getenv("PADDLE_OCR_DEVICE", "").strip()
-    if not device:
-        from .device_env import is_npu
-        if is_npu():
-            device = "npu:0"
-    if device:
-        return ["--device", device]
-    return []
+    """返回 PaddleOCR 命令的 --device 参数列表;多卡时按请求轮询。"""
+    devices = _get_paddle_ocr_devices()
+    if not devices:
+        return []
+    global _PADDLE_OCR_DEVICE_INDEX
+    with _PADDLE_OCR_DEVICE_LOCK:
+        idx = _PADDLE_OCR_DEVICE_INDEX % len(devices)
+        _PADDLE_OCR_DEVICE_INDEX += 1
+        device = devices[idx]
+    return ["--device", device]
 
 
 def detect_file_type(file_path: str) -> Optional[str]:

+ 7 - 42
pdf_converter_v2/utils/resource_monitor.py

@@ -1,10 +1,10 @@
 """
 资源监控采集器模块
-在OCR任务期间,后台线程定期采集GPU和系统负载数据
+在 OCR 任务期间,后台线程定期采集加速卡(NVIDIA GPU / 华为昇腾 NPU)和系统负载数据。
+根据运行环境自动选择 nvidia-smi 或 npu-smi 进行采集。
 """
 import threading
 import time
-import subprocess
 import logging
 import os
 from typing import Optional, Dict, Any, List
@@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
 
 
 class ResourceMonitor:
-    """资源监控采集器,在后台线程中定期采集GPU和系统负载数据"""
+    """资源监控采集器,在后台线程中定期采集 GPU/NPU 和系统负载数据"""
     
     def __init__(self, interval: float = 0.5):
         """
@@ -79,47 +79,12 @@ class ResourceMonitor:
         return sample
     
     def _get_gpu_info(self) -> Optional[Dict[str, Any]]:
-        """获取GPU信息"""
+        """获取加速卡信息(根据环境自动使用 nvidia-smi 或 npu-smi)"""
         try:
-            cmd = [
-                "nvidia-smi",
-                "--query-gpu=index,name,memory.total,memory.used,utilization.gpu",
-                "--format=csv,noheader,nounits"
-            ]
-            result = subprocess.run(
-                cmd,
-                capture_output=True,
-                text=True,
-                timeout=2,
-                check=False
-            )
-            
-            if result.returncode != 0:
-                return None
-            
-            lines = result.stdout.strip().split('\n')
-            if not lines or not lines[0]:
-                return None
-            
-            parts = [p.strip() for p in lines[0].split(',')]
-            if len(parts) < 5:
-                return None
-            
-            gpu_index = int(parts[0])
-            gpu_name = parts[1]
-            memory_total_mb = int(parts[2])
-            memory_used_mb = int(parts[3])
-            utilization = float(parts[4])
-            
-            return {
-                "gpu_index": gpu_index,
-                "gpu_name": gpu_name,
-                "gpu_memory_total": memory_total_mb * 1024 * 1024,  # 转换为字节
-                "gpu_memory_used": memory_used_mb * 1024 * 1024,  # 转换为字节
-                "gpu_utilization": utilization
-            }
+            from .gpu_monitor import get_gpu_info
+            return get_gpu_info()
         except Exception as e:
-            logger.debug(f"获取GPU信息失败: {e}")
+            logger.debug(f"获取加速卡信息失败: {e}")
             return None
     
     def _get_system_load(self) -> Optional[Dict[str, float]]: