converter.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. """PDF转换主函数模块 v2 - 使用新的API接口"""
  3. import asyncio
  4. import json
  5. import os
  6. import time
  7. import zipfile
  8. import tempfile
  9. import shutil
  10. from pathlib import Path
  11. from typing import Optional, Sequence
  12. import aiohttp
  13. import aiofiles
  14. from PIL import Image
  15. from ..utils.logging_config import get_logger
  16. from ..utils.file_utils import safe_stem
  17. from ..utils.pdf_splitter import get_pdf_page_count, split_pdf_by_pages
  18. from ..utils.mineru_url_selector import get_next_mineru_api_url, get_mineru_api_url_list
  19. from ..utils.paddleocr_fallback import (
  20. get_paddle_ocr_devices,
  21. get_paddle_ocr_device_args_for_index,
  22. _paddle_ocr_device_args,
  23. _get_paddleocr_subprocess_env,
  24. )
  25. from ..config import (
  26. PADDLE_DOC_PARSER_CMD, VL_REC_BACKEND, VL_REC_SERVER_URL,
  27. DEFAULT_BACKEND, DEFAULT_SERVER_URL, DEFAULT_PARSE_METHOD,
  28. DEFAULT_LANGUAGE, DEFAULT_START_PAGE_ID, DEFAULT_END_PAGE_ID,
  29. DEFAULT_TABLE_ENABLE, DEFAULT_FORMULA_ENABLE,
  30. DEFAULT_RESPONSE_FORMAT_ZIP, DEFAULT_RETURN_MIDDLE_JSON,
  31. DEFAULT_RETURN_MODEL_OUTPUT, DEFAULT_RETURN_MD, DEFAULT_RETURN_IMAGES,
  32. DEFAULT_RETURN_CONTENT_LIST,
  33. )
  34. logger = get_logger("pdf_converter_v2.processor")
  35. async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
  36. """异步执行 paddleocr doc_parser 命令"""
  37. logger.info(f"[Paddle] 执行命令: {' '.join(cmd)}")
  38. process = await asyncio.create_subprocess_exec(
  39. *cmd,
  40. stdout=asyncio.subprocess.PIPE,
  41. stderr=asyncio.subprocess.PIPE,
  42. env=_get_paddleocr_subprocess_env(),
  43. )
  44. stdout_bytes, stderr_bytes = await process.communicate()
  45. stdout = stdout_bytes.decode("utf-8", errors="ignore")
  46. stderr = stderr_bytes.decode("utf-8", errors="ignore")
  47. if stdout:
  48. logger.debug(f"[Paddle] stdout: {stdout[:2000]}")
  49. if stderr:
  50. logger.debug(f"[Paddle] stderr: {stderr[:2000]}")
  51. return process.returncode, stdout, stderr
  52. def _paddle_base_cmd(input_path: str, save_path_base: str, device_args: list) -> list:
  53. """构建 PaddleOCR doc_parser 命令(含设备参数)。"""
  54. cmd = [
  55. PADDLE_CMD,
  56. "doc_parser",
  57. "-i",
  58. input_path,
  59. "--precision",
  60. "fp32",
  61. "--use_doc_unwarping",
  62. "False",
  63. "--use_doc_orientation_classify",
  64. "True",
  65. "--use_chart_recognition",
  66. "True",
  67. "--save_path",
  68. save_path_base,
  69. ] + device_args
  70. # 添加 VL 识别后端配置(如果已配置)
  71. if VL_REC_BACKEND:
  72. cmd.extend(["--vl_rec_backend", VL_REC_BACKEND])
  73. if VL_REC_SERVER_URL:
  74. cmd.extend(["--vl_rec_server_url", VL_REC_SERVER_URL])
  75. return cmd
  76. async def _convert_with_paddle(
  77. input_file: str,
  78. output_dir: str,
  79. embed_images: bool,
  80. output_json: bool,
  81. forced_document_type: Optional[str],
  82. ):
  83. """针对工况附件使用 PaddleOCR doc_parser 直接转换;多卡时按页拆分并行跑满所有卡。"""
  84. if not os.path.exists(input_file):
  85. logger.error(f"[Paddle] 输入文件不存在: {input_file}")
  86. return None
  87. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  88. os.makedirs(output_dir, exist_ok=True)
  89. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
  90. logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
  91. devices = get_paddle_ocr_devices()
  92. ext = (Path(input_file).suffix or "").lower()
  93. page_count = get_pdf_page_count(input_file) if ext == ".pdf" else 0
  94. use_multi_card = len(devices) > 1 and ext == ".pdf" and page_count > 1
  95. try:
  96. if use_multi_card:
  97. # 多卡:按页拆成 N 段,每段用一张卡并行 doc_parser,再合并
  98. chunk_size = (page_count + len(devices) - 1) // len(devices)
  99. chunks_dir = os.path.join(temp_dir, "chunks")
  100. os.makedirs(chunks_dir, exist_ok=True)
  101. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
  102. if not chunk_paths:
  103. logger.error("[Paddle] 多卡拆分 PDF 失败")
  104. return None
  105. logger.info(f"[Paddle] PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行使用 {len(devices)} 张卡")
  106. tasks = []
  107. for i, chunk_path in enumerate(chunk_paths):
  108. save_path_base_i = os.path.join(temp_dir, f"out_{i}", Path(chunk_path).stem)
  109. os.makedirs(save_path_base_i, exist_ok=True)
  110. cmd = _paddle_base_cmd(
  111. chunk_path,
  112. save_path_base_i,
  113. get_paddle_ocr_device_args_for_index(i),
  114. )
  115. tasks.append(_run_paddle_doc_parser(cmd))
  116. results = await asyncio.gather(*tasks, return_exceptions=True)
  117. markdown_parts = []
  118. all_save_bases = [os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem) for i in range(len(chunk_paths))]
  119. for i, res in enumerate(results):
  120. if isinstance(res, Exception):
  121. logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 异常: {res}")
  122. continue
  123. ret_code, _, stderr = res
  124. if ret_code != 0:
  125. logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 失败: {stderr}")
  126. continue
  127. base = all_save_bases[i]
  128. md_files = sorted(Path(base).rglob("*.md"))
  129. for md_file in md_files:
  130. async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
  131. markdown_parts.append(await f.read())
  132. final_content = "\n\n".join(markdown_parts) if markdown_parts else ""
  133. else:
  134. # 单卡或非 PDF:一次 doc_parser
  135. save_path_base = os.path.join(temp_dir, Path(input_file).stem)
  136. os.makedirs(save_path_base, exist_ok=True)
  137. cmd = _paddle_base_cmd(input_file, save_path_base, _paddle_ocr_device_args())
  138. return_code, _, stderr = await _run_paddle_doc_parser(cmd)
  139. if return_code != 0:
  140. logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
  141. if stderr:
  142. logger.error(stderr)
  143. return None
  144. md_files = sorted(Path(save_path_base).rglob("*.md"))
  145. if not md_files:
  146. logger.error("[Paddle] 未找到Markdown文件")
  147. return None
  148. markdown_parts = []
  149. for md_file in md_files:
  150. async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
  151. markdown_parts.append(await f.read())
  152. final_content = "\n\n".join(markdown_parts)
  153. if not final_content:
  154. logger.error("[Paddle] 合并后无内容")
  155. return None
  156. logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
  157. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  158. os.makedirs(local_md_dir, exist_ok=True)
  159. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  160. async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
  161. await f.write(final_content)
  162. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  163. async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
  164. await f.write(final_content)
  165. if embed_images:
  166. local_image_dir = os.path.join(output_dir, file_name, "images")
  167. os.makedirs(local_image_dir, exist_ok=True)
  168. if use_multi_card:
  169. for i in range(len(chunk_paths)):
  170. base = os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem)
  171. for asset in Path(base).rglob("*"):
  172. if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
  173. shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
  174. else:
  175. base = os.path.join(temp_dir, Path(input_file).stem)
  176. for asset in Path(base).rglob("*"):
  177. if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
  178. shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
  179. json_data = None
  180. json_path = None
  181. if output_json:
  182. try:
  183. from ..parser.json_converter import parse_markdown_to_json
  184. json_output_dir = os.path.join(output_dir, file_name)
  185. json_data = parse_markdown_to_json(
  186. final_content,
  187. first_page_image=None,
  188. output_dir=json_output_dir,
  189. forced_document_type=forced_document_type,
  190. enable_paddleocr_fallback=True,
  191. input_file=input_file,
  192. )
  193. json_path = os.path.join(output_dir, f"{file_name}.json")
  194. async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
  195. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  196. except Exception as exc:
  197. logger.exception(f"[Paddle] JSON转换失败: {exc}")
  198. return {
  199. "markdown_file": output_md_path,
  200. "json_file": json_path,
  201. "json_data": json_data,
  202. "content": final_content,
  203. }
  204. finally:
  205. try:
  206. shutil.rmtree(temp_dir)
  207. except Exception as exc:
  208. logger.warning(f"[Paddle] 清理临时目录失败: {exc}")
  209. async def convert_to_markdown(
  210. input_file: str,
  211. output_dir: str = "./output",
  212. max_pages: int = 10,
  213. is_ocr: bool = False,
  214. formula_enable: bool = DEFAULT_FORMULA_ENABLE,
  215. table_enable: bool = DEFAULT_TABLE_ENABLE,
  216. language: str = DEFAULT_LANGUAGE,
  217. backend: str = DEFAULT_BACKEND,
  218. url: str = "http://127.0.0.1:5282",
  219. embed_images: bool = True,
  220. output_json: bool = False,
  221. start_page_id: int = DEFAULT_START_PAGE_ID,
  222. end_page_id: int = DEFAULT_END_PAGE_ID if DEFAULT_END_PAGE_ID >= 0 else 99999,
  223. parse_method: str = DEFAULT_PARSE_METHOD,
  224. server_url: str = DEFAULT_SERVER_URL or "string",
  225. response_format_zip: bool = DEFAULT_RESPONSE_FORMAT_ZIP,
  226. return_middle_json: bool = DEFAULT_RETURN_MIDDLE_JSON,
  227. return_model_output: bool = DEFAULT_RETURN_MODEL_OUTPUT,
  228. return_md: bool = DEFAULT_RETURN_MD,
  229. return_images: bool = DEFAULT_RETURN_IMAGES,
  230. return_content_list: bool = DEFAULT_RETURN_CONTENT_LIST,
  231. forced_document_type: Optional[str] = None
  232. ):
  233. """将PDF/图片转换为Markdown的主要函数(使用新的API接口)"""
  234. if not os.path.exists(input_file):
  235. logger.error(f"输入文件不存在: {input_file}")
  236. return None
  237. url = url or get_next_mineru_api_url()
  238. # 生成文件名
  239. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  240. try:
  241. os.makedirs(output_dir, exist_ok=True)
  242. # 构建API请求URL
  243. api_url = f"{url}/file_parse"
  244. logger.info(f"调用API接口: {api_url}")
  245. # 创建临时目录用于解压zip文件
  246. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_v2_{file_name}_")
  247. logger.info(f"创建临时目录: {temp_dir}")
  248. try:
  249. # 准备表单数据
  250. form_data = aiohttp.FormData()
  251. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  252. form_data.add_field('return_model_output', str(return_model_output).lower())
  253. form_data.add_field('return_md', str(return_md).lower())
  254. form_data.add_field('return_images', str(return_images).lower())
  255. form_data.add_field('end_page_id', str(end_page_id))
  256. form_data.add_field('parse_method', parse_method)
  257. form_data.add_field('start_page_id', str(start_page_id))
  258. form_data.add_field('lang_list', language)
  259. form_data.add_field('output_dir', './output')
  260. form_data.add_field('server_url', server_url)
  261. form_data.add_field('return_content_list', str(return_content_list).lower())
  262. form_data.add_field('backend', backend)
  263. form_data.add_field('table_enable', str(table_enable).lower())
  264. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  265. form_data.add_field('formula_enable', str(formula_enable).lower())
  266. # 格式化并记录调用参数(中英文对照)
  267. params_log = {
  268. "return_middle_json (返回中间JSON)": str(return_middle_json).lower(),
  269. "return_model_output (返回模型输出)": str(return_model_output).lower(),
  270. "return_md (返回Markdown)": str(return_md).lower(),
  271. "return_images (返回图片)": str(return_images).lower(),
  272. "end_page_id (结束页码)": str(end_page_id),
  273. "parse_method (解析方法)": parse_method,
  274. "start_page_id (起始页码)": str(start_page_id),
  275. "lang_list (语言列表)": language,
  276. "output_dir (输出目录)": "./output",
  277. "server_url (服务器URL)": server_url,
  278. "return_content_list (返回内容列表)": str(return_content_list).lower(),
  279. "backend (处理后端)": backend,
  280. "table_enable (启用表格识别)": str(table_enable).lower(),
  281. "response_format_zip (响应格式ZIP)": str(response_format_zip).lower(),
  282. "formula_enable (启用公式识别)": str(formula_enable).lower(),
  283. }
  284. logger.info(f"MinerU API 调用参数:\n{json.dumps(params_log, indent=4, ensure_ascii=False)}")
  285. # 打开文件并添加到表单数据(文件会在请求发送时读取)
  286. file_obj = open(input_file, 'rb')
  287. try:
  288. # 根据扩展名设置内容类型,默认使用application/octet-stream
  289. ext = (Path(input_file).suffix or "").lower()
  290. content_type = 'application/octet-stream'
  291. if ext == '.pdf':
  292. content_type = 'application/pdf'
  293. elif ext in {'.png'}:
  294. content_type = 'image/png'
  295. elif ext in {'.jpg', '.jpeg'}:
  296. content_type = 'image/jpeg'
  297. elif ext in {'.bmp'}:
  298. content_type = 'image/bmp'
  299. elif ext in {'.tif', '.tiff'}:
  300. content_type = 'image/tiff'
  301. elif ext in {'.webp'}:
  302. content_type = 'image/webp'
  303. # 不使用原始文件名,直接使用简单的固定命名,避免对端服务在构造输出路径时触发 “File name too long”
  304. # 从文件路径获取扩展名
  305. ext = Path(input_file).suffix or ".pdf"
  306. upload_name = f"file{ext}"
  307. form_data.add_field(
  308. 'files',
  309. file_obj,
  310. filename=upload_name,
  311. content_type=content_type
  312. )
  313. # 发送API请求(设置超时时间:总超时600秒,连接超时30秒,socket读取超时300秒)
  314. timeout = aiohttp.ClientTimeout(total=600, connect=30, sock_read=300)
  315. # 添加重试机制
  316. max_retries = 3
  317. retry_count = 0
  318. last_error = None
  319. while retry_count < max_retries:
  320. try:
  321. async with aiohttp.ClientSession(timeout=timeout) as session:
  322. if retry_count > 0:
  323. logger.warning(f"重试第 {retry_count} 次上传文件: {input_file}")
  324. else:
  325. logger.info(f"开始上传文件: {input_file}")
  326. async with session.post(api_url, data=form_data) as response:
  327. if response.status != 200:
  328. error_text = await response.text()
  329. logger.error(f"API请求失败,状态码: {response.status}, 错误: {error_text}")
  330. return None
  331. # 检查Content-Type是否为zip
  332. content_type = response.headers.get('Content-Type', '')
  333. if 'zip' not in content_type and 'application/zip' not in content_type:
  334. # 如果不是zip,尝试检查响应内容
  335. content_disposition = response.headers.get('Content-Disposition', '')
  336. if 'zip' not in content_disposition.lower():
  337. logger.warning(f"响应Content-Type可能不是zip: {content_type}")
  338. # 保存zip文件
  339. zip_path = os.path.join(temp_dir, f"{file_name}.zip")
  340. async with aiofiles.open(zip_path, 'wb') as f:
  341. async for chunk in response.content.iter_chunked(8192):
  342. await f.write(chunk)
  343. logger.info(f"Zip文件已保存: {zip_path}")
  344. # 成功,跳出重试循环
  345. break
  346. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  347. last_error = e
  348. retry_count += 1
  349. error_type = type(e).__name__
  350. logger.warning(f"API请求失败 ({error_type}): {e}, 重试 {retry_count}/{max_retries}")
  351. if retry_count < max_retries:
  352. # 等待一段时间后重试(指数退避)
  353. wait_time = 2 ** retry_count
  354. logger.info(f"等待 {wait_time} 秒后重试...")
  355. await asyncio.sleep(wait_time)
  356. # 重试时重新打开文件(aiohttp 可能已关闭原 handle,seek(0) 会报 seek of closed file)
  357. try:
  358. file_obj.close()
  359. except Exception:
  360. pass
  361. file_obj = open(input_file, 'rb')
  362. form_data = aiohttp.FormData()
  363. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  364. form_data.add_field('return_model_output', str(return_model_output).lower())
  365. form_data.add_field('return_md', str(return_md).lower())
  366. form_data.add_field('return_images', str(return_images).lower())
  367. form_data.add_field('end_page_id', str(end_page_id))
  368. form_data.add_field('parse_method', parse_method)
  369. form_data.add_field('start_page_id', str(start_page_id))
  370. form_data.add_field('lang_list', language)
  371. form_data.add_field('output_dir', './output')
  372. form_data.add_field('server_url', server_url)
  373. form_data.add_field('return_content_list', str(return_content_list).lower())
  374. form_data.add_field('backend', backend)
  375. form_data.add_field('table_enable', str(table_enable).lower())
  376. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  377. form_data.add_field('formula_enable', str(formula_enable).lower())
  378. form_data.add_field('files', file_obj, filename=upload_name, content_type=content_type)
  379. else:
  380. logger.error(f"API请求失败,已达到最大重试次数 ({max_retries})")
  381. raise last_error
  382. finally:
  383. # 关闭文件对象
  384. file_obj.close()
  385. # 解压zip文件
  386. logger.info("开始解压zip文件...")
  387. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  388. zip_ref.extractall(temp_dir)
  389. # 查找md文件
  390. md_files = list(Path(temp_dir).rglob("*.md"))
  391. if not md_files:
  392. logger.error("在zip文件中未找到md文件")
  393. return None
  394. logger.info(f"找到 {len(md_files)} 个md文件")
  395. # 读取所有md文件并合并
  396. markdown_parts = []
  397. for md_file in sorted(md_files):
  398. logger.info(f"读取md文件: {md_file}")
  399. async with aiofiles.open(md_file, 'r', encoding='utf-8') as f:
  400. content = await f.read()
  401. markdown_parts.append(content)
  402. # 合并所有页面内容
  403. original_content = "\n\n".join(markdown_parts)
  404. logger.info(f"合并后的markdown长度: {len(original_content)} 字符")
  405. # 准备输出目录
  406. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  407. os.makedirs(local_md_dir, exist_ok=True)
  408. # 处理图片嵌入(如果需要)
  409. final_content = original_content
  410. if embed_images:
  411. # 查找图片文件
  412. image_files = list(Path(temp_dir).rglob("*.png")) + list(Path(temp_dir).rglob("*.jpg")) + list(Path(temp_dir).rglob("*.jpeg"))
  413. if image_files:
  414. local_image_dir = os.path.join(output_dir, file_name, "images")
  415. os.makedirs(local_image_dir, exist_ok=True)
  416. # 复制图片到输出目录
  417. for img_file in image_files:
  418. dst_path = os.path.join(local_image_dir, img_file.name)
  419. shutil.copy2(img_file, dst_path)
  420. logger.debug(f"复制图片: {img_file} -> {dst_path}")
  421. # 保存Markdown文件
  422. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  423. async with aiofiles.open(md_path, 'w', encoding='utf-8') as f:
  424. await f.write(final_content)
  425. logger.info(f"Markdown文件已保存: {md_path}")
  426. # 生成输出文件路径(在output_dir根目录下也保存一份)
  427. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  428. async with aiofiles.open(output_md_path, 'w', encoding='utf-8') as f:
  429. await f.write(final_content)
  430. logger.info(f"转换完成: {output_md_path}")
  431. # JSON转换(如果需要)
  432. json_data = None
  433. json_path = None
  434. if output_json:
  435. try:
  436. logger.info("开始转换为JSON格式...")
  437. # 复用v1的json解析逻辑
  438. # 注意:v2版本不涉及MinerU和PaddleOCR的具体调用,只进行JSON解析
  439. # first_page_image设为None,因为v2版本不处理PDF图片
  440. from ..parser.json_converter import parse_markdown_to_json
  441. # 构建完整的输出目录路径,包含文件名的子目录
  442. json_output_dir = os.path.join(output_dir, file_name) if file_name else output_dir
  443. json_data = parse_markdown_to_json(
  444. original_content,
  445. first_page_image=None,
  446. output_dir=json_output_dir,
  447. forced_document_type=forced_document_type,
  448. enable_paddleocr_fallback=True,
  449. input_file=input_file,
  450. )
  451. json_path = os.path.join(output_dir, f"{file_name}.json")
  452. async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
  453. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  454. logger.info(f"JSON文件已保存: {json_path}")
  455. logger.info(f"文档类型: {json_data.get('document_type', 'unknown')}")
  456. except Exception as e:
  457. logger.exception(f"JSON转换失败: {e}")
  458. json_data = None
  459. return {
  460. 'markdown_file': output_md_path,
  461. 'json_file': json_path,
  462. 'json_data': json_data,
  463. 'content': final_content,
  464. 'original_content': original_content
  465. }
  466. finally:
  467. # 清理临时目录
  468. try:
  469. shutil.rmtree(temp_dir)
  470. logger.debug(f"已清理临时目录: {temp_dir}")
  471. except Exception as e:
  472. logger.warning(f"清理临时目录失败: {e}")
  473. except Exception as e:
  474. logger.exception(f"转换过程出错: {e}")
  475. return None
  476. # PDF 超过此页数时按段切割后分别转换再合并,降低单次请求内存(避免 MinerU OOM)
  477. PDF_CHUNK_PAGES = 50
  478. async def convert_pdf_to_markdown_only(
  479. input_file: str,
  480. output_dir: str,
  481. backend: str = "mineru",
  482. url: Optional[str] = None,
  483. max_pages: int = 99999,
  484. formula_enable: bool = True,
  485. table_enable: bool = True,
  486. language: str = "ch",
  487. return_images: bool = False,
  488. ) -> Optional[dict]:
  489. """
  490. 仅将 PDF/图片 转为 Markdown 文本,不解析 JSON。
  491. 大 PDF(>50 页)会先按 50 页切割,分段转换后合并 MD,降低单次请求内存。
  492. :param return_images: 是否同时拉取并保存图片(MinerU 的 return_images + 本地 embed_images)
  493. :return: {"markdown": str, "filename": str} 或 None
  494. """
  495. if not os.path.exists(input_file):
  496. logger.error(f"输入文件不存在: {input_file}")
  497. return None
  498. ext = (Path(input_file).suffix or "").lower()
  499. url = url or get_next_mineru_api_url()
  500. # 仅对 PDF 做按页切割;图片或页数不足则单次转换
  501. if ext == ".pdf":
  502. page_count = get_pdf_page_count(input_file)
  503. if page_count <= 0:
  504. logger.error(f"无法获取 PDF 页数: {input_file}")
  505. return None
  506. # MinerU 多卡:按页拆成 N 段,每段发到不同 API 实例并行转换后合并(单任务用满所有卡)
  507. url_list = get_mineru_api_url_list()
  508. if backend != "paddle" and len(url_list) > 1 and page_count > 1:
  509. chunk_size = (page_count + len(url_list) - 1) // len(url_list)
  510. chunks_dir = tempfile.mkdtemp(prefix="pdf_multi_card_", dir=output_dir)
  511. try:
  512. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
  513. if not chunk_paths:
  514. return None
  515. logger.info(f"PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行发往 {len(url_list)} 个 MinerU 实例")
  516. tasks = []
  517. for i, chunk_path in enumerate(chunk_paths):
  518. chunk_out = os.path.join(chunks_dir, f"out_{i}")
  519. os.makedirs(chunk_out, exist_ok=True)
  520. tasks.append(
  521. convert_to_markdown(
  522. input_file=chunk_path,
  523. output_dir=chunk_out,
  524. max_pages=max_pages,
  525. output_json=False,
  526. formula_enable=formula_enable,
  527. table_enable=table_enable,
  528. language=language,
  529. url=url_list[i % len(url_list)],
  530. embed_images=return_images,
  531. )
  532. )
  533. results = await asyncio.gather(*tasks, return_exceptions=True)
  534. parts = []
  535. for i, r in enumerate(results):
  536. if isinstance(r, Exception):
  537. logger.warning(f"第 {i + 1} 段 MinerU 转换异常: {r}")
  538. continue
  539. if r and r.get("content"):
  540. parts.append(r["content"])
  541. if not parts:
  542. return None
  543. merged = "\n\n".join(parts)
  544. filename = Path(input_file).stem + ".md"
  545. return {"markdown": merged, "filename": filename}
  546. finally:
  547. try:
  548. shutil.rmtree(chunks_dir, ignore_errors=True)
  549. except Exception as e:
  550. logger.debug(f"清理多卡临时目录失败: {e}")
  551. if page_count > PDF_CHUNK_PAGES:
  552. chunks_dir = tempfile.mkdtemp(prefix="pdf_chunks_", dir=output_dir)
  553. try:
  554. chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=PDF_CHUNK_PAGES)
  555. if not chunk_paths:
  556. return None
  557. logger.info(f"PDF 共 {page_count} 页,按 {PDF_CHUNK_PAGES} 页切割为 {len(chunk_paths)} 段,分段转换后合并")
  558. parts = []
  559. for i, chunk_path in enumerate(chunk_paths):
  560. chunk_out = os.path.join(output_dir, f"chunk_{i}")
  561. os.makedirs(chunk_out, exist_ok=True)
  562. if backend == "paddle":
  563. r = await _convert_with_paddle(
  564. input_file=chunk_path,
  565. output_dir=chunk_out,
  566. embed_images=return_images,
  567. output_json=False,
  568. forced_document_type=None,
  569. )
  570. else:
  571. r = await convert_to_markdown(
  572. input_file=chunk_path,
  573. output_dir=chunk_out,
  574. max_pages=max_pages,
  575. output_json=False,
  576. formula_enable=formula_enable,
  577. table_enable=table_enable,
  578. language=language,
  579. url=url,
  580. embed_images=return_images,
  581. )
  582. if r and r.get("content"):
  583. parts.append(r["content"])
  584. else:
  585. logger.warning(f"第 {i + 1} 段转换无内容,跳过")
  586. if not parts:
  587. return None
  588. merged = "\n\n".join(parts)
  589. filename = Path(input_file).stem + ".md"
  590. return {"markdown": merged, "filename": filename}
  591. finally:
  592. try:
  593. shutil.rmtree(chunks_dir, ignore_errors=True)
  594. except Exception as e:
  595. logger.debug(f"清理切割临时目录失败: {e}")
  596. # 单次转换(非 PDF、或 PDF 页数 <= PDF_CHUNK_PAGES)
  597. result = None
  598. if backend == "paddle":
  599. result = await _convert_with_paddle(
  600. input_file=input_file,
  601. output_dir=output_dir,
  602. embed_images=return_images,
  603. output_json=False,
  604. forced_document_type=None,
  605. )
  606. else:
  607. result = await convert_to_markdown(
  608. input_file=input_file,
  609. output_dir=output_dir,
  610. max_pages=max_pages,
  611. output_json=False,
  612. formula_enable=formula_enable,
  613. table_enable=table_enable,
  614. language=language,
  615. url=url,
  616. embed_images=return_images,
  617. )
  618. if not result or not result.get("content"):
  619. return None
  620. md_path = result.get("markdown_file") or ""
  621. filename = Path(md_path).name if md_path else Path(input_file).stem + ".md"
  622. return {"markdown": result["content"], "filename": filename}