converter.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. """PDF转换主函数模块 v2 - 使用新的API接口"""
  3. import asyncio
  4. import json
  5. import os
  6. import time
  7. import zipfile
  8. import tempfile
  9. import shutil
  10. from pathlib import Path
  11. from typing import Optional, Sequence
  12. import aiohttp
  13. import aiofiles
  14. from PIL import Image
  15. from ..utils.logging_config import get_logger
  16. from ..utils.file_utils import safe_stem
  17. from ..utils.pdf_splitter import get_pdf_page_count, split_pdf_by_pages
  18. from ..utils.mineru_url_selector import get_next_mineru_api_url, get_mineru_api_url_list
  19. from ..utils.paddleocr_fallback import (
  20. get_paddle_ocr_devices,
  21. get_paddle_ocr_device_args_for_index,
  22. _paddle_ocr_device_args,
  23. _get_paddleocr_subprocess_env,
  24. )
  25. logger = get_logger("pdf_converter_v2.processor")
  26. PADDLE_CMD = os.getenv("PADDLE_DOC_PARSER_CMD", "paddleocr")
  27. async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
  28. """异步执行 paddleocr doc_parser 命令"""
  29. logger.info(f"[Paddle] 执行命令: {' '.join(cmd)}")
  30. process = await asyncio.create_subprocess_exec(
  31. *cmd,
  32. stdout=asyncio.subprocess.PIPE,
  33. stderr=asyncio.subprocess.PIPE,
  34. env=_get_paddleocr_subprocess_env(),
  35. )
  36. stdout_bytes, stderr_bytes = await process.communicate()
  37. stdout = stdout_bytes.decode("utf-8", errors="ignore")
  38. stderr = stderr_bytes.decode("utf-8", errors="ignore")
  39. if stdout:
  40. logger.debug(f"[Paddle] stdout: {stdout[:2000]}")
  41. if stderr:
  42. logger.debug(f"[Paddle] stderr: {stderr[:2000]}")
  43. return process.returncode, stdout, stderr
  44. def _paddle_base_cmd(input_path: str, save_path_base: str, device_args: list) -> list:
  45. """构建 PaddleOCR doc_parser 命令(含设备参数)。"""
  46. return [
  47. PADDLE_CMD,
  48. "doc_parser",
  49. "-i",
  50. input_path,
  51. "--precision",
  52. "fp32",
  53. "--use_doc_unwarping",
  54. "False",
  55. "--use_doc_orientation_classify",
  56. "True",
  57. "--use_chart_recognition",
  58. "True",
  59. "--save_path",
  60. save_path_base,
  61. ] + device_args
  62. async def _convert_with_paddle(
  63. input_file: str,
  64. output_dir: str,
  65. embed_images: bool,
  66. output_json: bool,
  67. forced_document_type: Optional[str],
  68. ):
  69. """针对工况附件使用 PaddleOCR doc_parser 直接转换;多卡时按页拆分并行跑满所有卡。"""
  70. if not os.path.exists(input_file):
  71. logger.error(f"[Paddle] 输入文件不存在: {input_file}")
  72. return None
  73. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  74. os.makedirs(output_dir, exist_ok=True)
  75. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
  76. logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
  77. devices = get_paddle_ocr_devices()
  78. ext = (Path(input_file).suffix or "").lower()
  79. page_count = get_pdf_page_count(input_file) if ext == ".pdf" else 0
  80. use_multi_card = len(devices) > 1 and ext == ".pdf" and page_count > 1
  81. try:
  82. if use_multi_card:
  83. # 多卡:按页拆成 N 段,每段用一张卡并行 doc_parser,再合并
  84. chunk_size = (page_count + len(devices) - 1) // len(devices)
  85. chunks_dir = os.path.join(temp_dir, "chunks")
  86. os.makedirs(chunks_dir, exist_ok=True)
  87. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
  88. if not chunk_paths:
  89. logger.error("[Paddle] 多卡拆分 PDF 失败")
  90. return None
  91. logger.info(f"[Paddle] PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行使用 {len(devices)} 张卡")
  92. tasks = []
  93. for i, chunk_path in enumerate(chunk_paths):
  94. save_path_base_i = os.path.join(temp_dir, f"out_{i}", Path(chunk_path).stem)
  95. os.makedirs(save_path_base_i, exist_ok=True)
  96. cmd = _paddle_base_cmd(
  97. chunk_path,
  98. save_path_base_i,
  99. get_paddle_ocr_device_args_for_index(i),
  100. )
  101. tasks.append(_run_paddle_doc_parser(cmd))
  102. results = await asyncio.gather(*tasks, return_exceptions=True)
  103. markdown_parts = []
  104. all_save_bases = [os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem) for i in range(len(chunk_paths))]
  105. for i, res in enumerate(results):
  106. if isinstance(res, Exception):
  107. logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 异常: {res}")
  108. continue
  109. ret_code, _, stderr = res
  110. if ret_code != 0:
  111. logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 失败: {stderr}")
  112. continue
  113. base = all_save_bases[i]
  114. md_files = sorted(Path(base).rglob("*.md"))
  115. for md_file in md_files:
  116. async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
  117. markdown_parts.append(await f.read())
  118. final_content = "\n\n".join(markdown_parts) if markdown_parts else ""
  119. else:
  120. # 单卡或非 PDF:一次 doc_parser
  121. save_path_base = os.path.join(temp_dir, Path(input_file).stem)
  122. os.makedirs(save_path_base, exist_ok=True)
  123. cmd = _paddle_base_cmd(input_file, save_path_base, _paddle_ocr_device_args())
  124. return_code, _, stderr = await _run_paddle_doc_parser(cmd)
  125. if return_code != 0:
  126. logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
  127. if stderr:
  128. logger.error(stderr)
  129. return None
  130. md_files = sorted(Path(save_path_base).rglob("*.md"))
  131. if not md_files:
  132. logger.error("[Paddle] 未找到Markdown文件")
  133. return None
  134. markdown_parts = []
  135. for md_file in md_files:
  136. async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
  137. markdown_parts.append(await f.read())
  138. final_content = "\n\n".join(markdown_parts)
  139. if not final_content:
  140. logger.error("[Paddle] 合并后无内容")
  141. return None
  142. logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
  143. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  144. os.makedirs(local_md_dir, exist_ok=True)
  145. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  146. async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
  147. await f.write(final_content)
  148. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  149. async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
  150. await f.write(final_content)
  151. if embed_images:
  152. local_image_dir = os.path.join(output_dir, file_name, "images")
  153. os.makedirs(local_image_dir, exist_ok=True)
  154. if use_multi_card:
  155. for i in range(len(chunk_paths)):
  156. base = os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem)
  157. for asset in Path(base).rglob("*"):
  158. if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
  159. shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
  160. else:
  161. base = os.path.join(temp_dir, Path(input_file).stem)
  162. for asset in Path(base).rglob("*"):
  163. if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
  164. shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
  165. json_data = None
  166. json_path = None
  167. if output_json:
  168. try:
  169. from ..parser.json_converter import parse_markdown_to_json
  170. json_output_dir = os.path.join(output_dir, file_name)
  171. json_data = parse_markdown_to_json(
  172. final_content,
  173. first_page_image=None,
  174. output_dir=json_output_dir,
  175. forced_document_type=forced_document_type,
  176. enable_paddleocr_fallback=True,
  177. input_file=input_file,
  178. )
  179. json_path = os.path.join(output_dir, f"{file_name}.json")
  180. async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
  181. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  182. except Exception as exc:
  183. logger.exception(f"[Paddle] JSON转换失败: {exc}")
  184. return {
  185. "markdown_file": output_md_path,
  186. "json_file": json_path,
  187. "json_data": json_data,
  188. "content": final_content,
  189. }
  190. finally:
  191. try:
  192. shutil.rmtree(temp_dir)
  193. except Exception as exc:
  194. logger.warning(f"[Paddle] 清理临时目录失败: {exc}")
  195. async def convert_to_markdown(
  196. input_file: str,
  197. output_dir: str = "./output",
  198. max_pages: int = 10,
  199. is_ocr: bool = False,
  200. formula_enable: bool = True,
  201. table_enable: bool = True,
  202. language: str = "ch",
  203. backend: str = "vlm-vllm-async-engine",
  204. url: str = "http://127.0.0.1:5282",
  205. embed_images: bool = True,
  206. output_json: bool = False,
  207. start_page_id: int = 0,
  208. end_page_id: int = 99999,
  209. parse_method: str = "auto",
  210. server_url: str = "string",
  211. response_format_zip: bool = True,
  212. return_middle_json: bool = False,
  213. return_model_output: bool = True,
  214. return_md: bool = True,
  215. return_images: bool = True, # 默认启用,以便PaddleOCR备用解析可以使用
  216. return_content_list: bool = False,
  217. forced_document_type: Optional[str] = None
  218. ):
  219. """将PDF/图片转换为Markdown的主要函数(使用新的API接口)"""
  220. if not os.path.exists(input_file):
  221. logger.error(f"输入文件不存在: {input_file}")
  222. return None
  223. url = url or get_next_mineru_api_url()
  224. # 生成文件名
  225. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  226. try:
  227. os.makedirs(output_dir, exist_ok=True)
  228. # 构建API请求URL
  229. api_url = f"{url}/file_parse"
  230. logger.info(f"调用API接口: {api_url}")
  231. # 创建临时目录用于解压zip文件
  232. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_v2_{file_name}_")
  233. logger.info(f"创建临时目录: {temp_dir}")
  234. try:
  235. # 准备表单数据
  236. form_data = aiohttp.FormData()
  237. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  238. form_data.add_field('return_model_output', str(return_model_output).lower())
  239. form_data.add_field('return_md', str(return_md).lower())
  240. form_data.add_field('return_images', str(return_images).lower())
  241. form_data.add_field('end_page_id', str(end_page_id))
  242. form_data.add_field('parse_method', parse_method)
  243. form_data.add_field('start_page_id', str(start_page_id))
  244. form_data.add_field('lang_list', language)
  245. form_data.add_field('output_dir', './output')
  246. form_data.add_field('server_url', server_url)
  247. form_data.add_field('return_content_list', str(return_content_list).lower())
  248. form_data.add_field('backend', backend)
  249. form_data.add_field('table_enable', str(table_enable).lower())
  250. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  251. form_data.add_field('formula_enable', str(formula_enable).lower())
  252. # 打开文件并添加到表单数据(文件会在请求发送时读取)
  253. file_obj = open(input_file, 'rb')
  254. try:
  255. # 根据扩展名设置内容类型,默认使用application/octet-stream
  256. ext = (Path(input_file).suffix or "").lower()
  257. content_type = 'application/octet-stream'
  258. if ext == '.pdf':
  259. content_type = 'application/pdf'
  260. elif ext in {'.png'}:
  261. content_type = 'image/png'
  262. elif ext in {'.jpg', '.jpeg'}:
  263. content_type = 'image/jpeg'
  264. elif ext in {'.bmp'}:
  265. content_type = 'image/bmp'
  266. elif ext in {'.tif', '.tiff'}:
  267. content_type = 'image/tiff'
  268. elif ext in {'.webp'}:
  269. content_type = 'image/webp'
  270. # 不使用原始文件名,直接使用简单的固定命名,避免对端服务在构造输出路径时触发 “File name too long”
  271. # 从文件路径获取扩展名
  272. ext = Path(input_file).suffix or ".pdf"
  273. upload_name = f"file{ext}"
  274. form_data.add_field(
  275. 'files',
  276. file_obj,
  277. filename=upload_name,
  278. content_type=content_type
  279. )
  280. # 发送API请求(设置超时时间:总超时600秒,连接超时30秒,socket读取超时300秒)
  281. timeout = aiohttp.ClientTimeout(total=600, connect=30, sock_read=300)
  282. # 添加重试机制
  283. max_retries = 3
  284. retry_count = 0
  285. last_error = None
  286. while retry_count < max_retries:
  287. try:
  288. async with aiohttp.ClientSession(timeout=timeout) as session:
  289. if retry_count > 0:
  290. logger.warning(f"重试第 {retry_count} 次上传文件: {input_file}")
  291. else:
  292. logger.info(f"开始上传文件: {input_file}")
  293. async with session.post(api_url, data=form_data) as response:
  294. if response.status != 200:
  295. error_text = await response.text()
  296. logger.error(f"API请求失败,状态码: {response.status}, 错误: {error_text}")
  297. return None
  298. # 检查Content-Type是否为zip
  299. content_type = response.headers.get('Content-Type', '')
  300. if 'zip' not in content_type and 'application/zip' not in content_type:
  301. # 如果不是zip,尝试检查响应内容
  302. content_disposition = response.headers.get('Content-Disposition', '')
  303. if 'zip' not in content_disposition.lower():
  304. logger.warning(f"响应Content-Type可能不是zip: {content_type}")
  305. # 保存zip文件
  306. zip_path = os.path.join(temp_dir, f"{file_name}.zip")
  307. async with aiofiles.open(zip_path, 'wb') as f:
  308. async for chunk in response.content.iter_chunked(8192):
  309. await f.write(chunk)
  310. logger.info(f"Zip文件已保存: {zip_path}")
  311. # 成功,跳出重试循环
  312. break
  313. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  314. last_error = e
  315. retry_count += 1
  316. error_type = type(e).__name__
  317. logger.warning(f"API请求失败 ({error_type}): {e}, 重试 {retry_count}/{max_retries}")
  318. if retry_count < max_retries:
  319. # 等待一段时间后重试(指数退避)
  320. wait_time = 2 ** retry_count
  321. logger.info(f"等待 {wait_time} 秒后重试...")
  322. await asyncio.sleep(wait_time)
  323. # 重试时重新打开文件(aiohttp 可能已关闭原 handle,seek(0) 会报 seek of closed file)
  324. try:
  325. file_obj.close()
  326. except Exception:
  327. pass
  328. file_obj = open(input_file, 'rb')
  329. form_data = aiohttp.FormData()
  330. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  331. form_data.add_field('return_model_output', str(return_model_output).lower())
  332. form_data.add_field('return_md', str(return_md).lower())
  333. form_data.add_field('return_images', str(return_images).lower())
  334. form_data.add_field('end_page_id', str(end_page_id))
  335. form_data.add_field('parse_method', parse_method)
  336. form_data.add_field('start_page_id', str(start_page_id))
  337. form_data.add_field('lang_list', language)
  338. form_data.add_field('output_dir', './output')
  339. form_data.add_field('server_url', server_url)
  340. form_data.add_field('return_content_list', str(return_content_list).lower())
  341. form_data.add_field('backend', backend)
  342. form_data.add_field('table_enable', str(table_enable).lower())
  343. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  344. form_data.add_field('formula_enable', str(formula_enable).lower())
  345. form_data.add_field('files', file_obj, filename=upload_name, content_type=content_type)
  346. else:
  347. logger.error(f"API请求失败,已达到最大重试次数 ({max_retries})")
  348. raise last_error
  349. finally:
  350. # 关闭文件对象
  351. file_obj.close()
  352. # 解压zip文件
  353. logger.info("开始解压zip文件...")
  354. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  355. zip_ref.extractall(temp_dir)
  356. # 查找md文件
  357. md_files = list(Path(temp_dir).rglob("*.md"))
  358. if not md_files:
  359. logger.error("在zip文件中未找到md文件")
  360. return None
  361. logger.info(f"找到 {len(md_files)} 个md文件")
  362. # 读取所有md文件并合并
  363. markdown_parts = []
  364. for md_file in sorted(md_files):
  365. logger.info(f"读取md文件: {md_file}")
  366. async with aiofiles.open(md_file, 'r', encoding='utf-8') as f:
  367. content = await f.read()
  368. markdown_parts.append(content)
  369. # 合并所有页面内容
  370. original_content = "\n\n".join(markdown_parts)
  371. logger.info(f"合并后的markdown长度: {len(original_content)} 字符")
  372. # 准备输出目录
  373. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  374. os.makedirs(local_md_dir, exist_ok=True)
  375. # 处理图片嵌入(如果需要)
  376. final_content = original_content
  377. if embed_images:
  378. # 查找图片文件
  379. image_files = list(Path(temp_dir).rglob("*.png")) + list(Path(temp_dir).rglob("*.jpg")) + list(Path(temp_dir).rglob("*.jpeg"))
  380. if image_files:
  381. local_image_dir = os.path.join(output_dir, file_name, "images")
  382. os.makedirs(local_image_dir, exist_ok=True)
  383. # 复制图片到输出目录
  384. for img_file in image_files:
  385. dst_path = os.path.join(local_image_dir, img_file.name)
  386. shutil.copy2(img_file, dst_path)
  387. logger.debug(f"复制图片: {img_file} -> {dst_path}")
  388. # 保存Markdown文件
  389. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  390. async with aiofiles.open(md_path, 'w', encoding='utf-8') as f:
  391. await f.write(final_content)
  392. logger.info(f"Markdown文件已保存: {md_path}")
  393. # 生成输出文件路径(在output_dir根目录下也保存一份)
  394. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  395. async with aiofiles.open(output_md_path, 'w', encoding='utf-8') as f:
  396. await f.write(final_content)
  397. logger.info(f"转换完成: {output_md_path}")
  398. # JSON转换(如果需要)
  399. json_data = None
  400. json_path = None
  401. if output_json:
  402. try:
  403. logger.info("开始转换为JSON格式...")
  404. # 复用v1的json解析逻辑
  405. # 注意:v2版本不涉及MinerU和PaddleOCR的具体调用,只进行JSON解析
  406. # first_page_image设为None,因为v2版本不处理PDF图片
  407. from ..parser.json_converter import parse_markdown_to_json
  408. # 构建完整的输出目录路径,包含文件名的子目录
  409. json_output_dir = os.path.join(output_dir, file_name) if file_name else output_dir
  410. json_data = parse_markdown_to_json(
  411. original_content,
  412. first_page_image=None,
  413. output_dir=json_output_dir,
  414. forced_document_type=forced_document_type,
  415. enable_paddleocr_fallback=True,
  416. input_file=input_file,
  417. )
  418. json_path = os.path.join(output_dir, f"{file_name}.json")
  419. async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
  420. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  421. logger.info(f"JSON文件已保存: {json_path}")
  422. logger.info(f"文档类型: {json_data.get('document_type', 'unknown')}")
  423. except Exception as e:
  424. logger.exception(f"JSON转换失败: {e}")
  425. json_data = None
  426. return {
  427. 'markdown_file': output_md_path,
  428. 'json_file': json_path,
  429. 'json_data': json_data,
  430. 'content': final_content,
  431. 'original_content': original_content
  432. }
  433. finally:
  434. # 清理临时目录
  435. try:
  436. shutil.rmtree(temp_dir)
  437. logger.debug(f"已清理临时目录: {temp_dir}")
  438. except Exception as e:
  439. logger.warning(f"清理临时目录失败: {e}")
  440. except Exception as e:
  441. logger.exception(f"转换过程出错: {e}")
  442. return None
  443. # PDF 超过此页数时按段切割后分别转换再合并,降低单次请求内存(避免 MinerU OOM)
  444. PDF_CHUNK_PAGES = 50
  445. async def convert_pdf_to_markdown_only(
  446. input_file: str,
  447. output_dir: str,
  448. backend: str = "mineru",
  449. url: Optional[str] = None,
  450. max_pages: int = 99999,
  451. formula_enable: bool = True,
  452. table_enable: bool = True,
  453. language: str = "ch",
  454. return_images: bool = False,
  455. ) -> Optional[dict]:
  456. """
  457. 仅将 PDF/图片 转为 Markdown 文本,不解析 JSON。
  458. 大 PDF(>50 页)会先按 50 页切割,分段转换后合并 MD,降低单次请求内存。
  459. :param return_images: 是否同时拉取并保存图片(MinerU 的 return_images + 本地 embed_images)
  460. :return: {"markdown": str, "filename": str} 或 None
  461. """
  462. if not os.path.exists(input_file):
  463. logger.error(f"输入文件不存在: {input_file}")
  464. return None
  465. ext = (Path(input_file).suffix or "").lower()
  466. url = url or get_next_mineru_api_url()
  467. # 仅对 PDF 做按页切割;图片或页数不足则单次转换
  468. if ext == ".pdf":
  469. page_count = get_pdf_page_count(input_file)
  470. if page_count <= 0:
  471. logger.error(f"无法获取 PDF 页数: {input_file}")
  472. return None
  473. # MinerU 多卡:按页拆成 N 段,每段发到不同 API 实例并行转换后合并(单任务用满所有卡)
  474. url_list = get_mineru_api_url_list()
  475. if backend != "paddle" and len(url_list) > 1 and page_count > 1:
  476. chunk_size = (page_count + len(url_list) - 1) // len(url_list)
  477. chunks_dir = tempfile.mkdtemp(prefix="pdf_multi_card_", dir=output_dir)
  478. try:
  479. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
  480. if not chunk_paths:
  481. return None
  482. logger.info(f"PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行发往 {len(url_list)} 个 MinerU 实例")
  483. tasks = []
  484. for i, chunk_path in enumerate(chunk_paths):
  485. chunk_out = os.path.join(chunks_dir, f"out_{i}")
  486. os.makedirs(chunk_out, exist_ok=True)
  487. tasks.append(
  488. convert_to_markdown(
  489. input_file=chunk_path,
  490. output_dir=chunk_out,
  491. max_pages=max_pages,
  492. output_json=False,
  493. formula_enable=formula_enable,
  494. table_enable=table_enable,
  495. language=language,
  496. url=url_list[i % len(url_list)],
  497. embed_images=return_images,
  498. )
  499. )
  500. results = await asyncio.gather(*tasks, return_exceptions=True)
  501. parts = []
  502. for i, r in enumerate(results):
  503. if isinstance(r, Exception):
  504. logger.warning(f"第 {i + 1} 段 MinerU 转换异常: {r}")
  505. continue
  506. if r and r.get("content"):
  507. parts.append(r["content"])
  508. if not parts:
  509. return None
  510. merged = "\n\n".join(parts)
  511. filename = Path(input_file).stem + ".md"
  512. return {"markdown": merged, "filename": filename}
  513. finally:
  514. try:
  515. shutil.rmtree(chunks_dir, ignore_errors=True)
  516. except Exception as e:
  517. logger.debug(f"清理多卡临时目录失败: {e}")
  518. if page_count > PDF_CHUNK_PAGES:
  519. chunks_dir = tempfile.mkdtemp(prefix="pdf_chunks_", dir=output_dir)
  520. try:
  521. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=PDF_CHUNK_PAGES)
  522. if not chunk_paths:
  523. return None
  524. logger.info(f"PDF 共 {page_count} 页,按 {PDF_CHUNK_PAGES} 页切割为 {len(chunk_paths)} 段,分段转换后合并")
  525. parts = []
  526. for i, chunk_path in enumerate(chunk_paths):
  527. chunk_out = os.path.join(output_dir, f"chunk_{i}")
  528. os.makedirs(chunk_out, exist_ok=True)
  529. if backend == "paddle":
  530. r = await _convert_with_paddle(
  531. input_file=chunk_path,
  532. output_dir=chunk_out,
  533. embed_images=return_images,
  534. output_json=False,
  535. forced_document_type=None,
  536. )
  537. else:
  538. r = await convert_to_markdown(
  539. input_file=chunk_path,
  540. output_dir=chunk_out,
  541. max_pages=max_pages,
  542. output_json=False,
  543. formula_enable=formula_enable,
  544. table_enable=table_enable,
  545. language=language,
  546. url=url,
  547. embed_images=return_images,
  548. )
  549. if r and r.get("content"):
  550. parts.append(r["content"])
  551. else:
  552. logger.warning(f"第 {i + 1} 段转换无内容,跳过")
  553. if not parts:
  554. return None
  555. merged = "\n\n".join(parts)
  556. filename = Path(input_file).stem + ".md"
  557. return {"markdown": merged, "filename": filename}
  558. finally:
  559. try:
  560. shutil.rmtree(chunks_dir, ignore_errors=True)
  561. except Exception as e:
  562. logger.debug(f"清理切割临时目录失败: {e}")
  563. # 单次转换(非 PDF、或 PDF 页数 <= PDF_CHUNK_PAGES)
  564. result = None
  565. if backend == "paddle":
  566. result = await _convert_with_paddle(
  567. input_file=input_file,
  568. output_dir=output_dir,
  569. embed_images=return_images,
  570. output_json=False,
  571. forced_document_type=None,
  572. )
  573. else:
  574. result = await convert_to_markdown(
  575. input_file=input_file,
  576. output_dir=output_dir,
  577. max_pages=max_pages,
  578. output_json=False,
  579. formula_enable=formula_enable,
  580. table_enable=table_enable,
  581. language=language,
  582. url=url,
  583. embed_images=return_images,
  584. )
  585. if not result or not result.get("content"):
  586. return None
  587. md_path = result.get("markdown_file") or ""
  588. filename = Path(md_path).name if md_path else Path(input_file).stem + ".md"
  589. return {"markdown": result["content"], "filename": filename}