converter.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. """PDF转换主函数模块 v2 - 使用新的API接口"""
  3. import asyncio
  4. import json
  5. import os
  6. import time
  7. import zipfile
  8. import tempfile
  9. import shutil
  10. from pathlib import Path
  11. from typing import Optional, Sequence
  12. import aiohttp
  13. import aiofiles
  14. from PIL import Image
  15. from ..utils.logging_config import get_logger
  16. from ..utils.file_utils import safe_stem
  17. from ..utils.paddleocr_fallback import _get_paddleocr_subprocess_env
  18. logger = get_logger("pdf_converter_v2.processor")
  19. PADDLE_CMD = os.getenv("PADDLE_DOC_PARSER_CMD", "paddleocr")
  20. async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
  21. """异步执行 paddleocr doc_parser 命令"""
  22. logger.info(f"[Paddle] 执行命令: {' '.join(cmd)}")
  23. process = await asyncio.create_subprocess_exec(
  24. *cmd,
  25. stdout=asyncio.subprocess.PIPE,
  26. stderr=asyncio.subprocess.PIPE,
  27. env=_get_paddleocr_subprocess_env(),
  28. )
  29. stdout_bytes, stderr_bytes = await process.communicate()
  30. stdout = stdout_bytes.decode("utf-8", errors="ignore")
  31. stderr = stderr_bytes.decode("utf-8", errors="ignore")
  32. if stdout:
  33. logger.debug(f"[Paddle] stdout: {stdout[:2000]}")
  34. if stderr:
  35. logger.debug(f"[Paddle] stderr: {stderr[:2000]}")
  36. return process.returncode, stdout, stderr
  37. async def _convert_with_paddle(
  38. input_file: str,
  39. output_dir: str,
  40. embed_images: bool,
  41. output_json: bool,
  42. forced_document_type: Optional[str],
  43. ):
  44. """针对工况附件使用 PaddleOCR doc_parser 直接转换"""
  45. if not os.path.exists(input_file):
  46. logger.error(f"[Paddle] 输入文件不存在: {input_file}")
  47. return None
  48. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  49. os.makedirs(output_dir, exist_ok=True)
  50. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
  51. logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
  52. save_path_base = os.path.join(temp_dir, Path(input_file).stem)
  53. os.makedirs(save_path_base, exist_ok=True)
  54. cmd = [
  55. PADDLE_CMD,
  56. "doc_parser",
  57. "-i",
  58. input_file,
  59. "--precision",
  60. "fp32",
  61. "--use_doc_unwarping",
  62. "False",
  63. "--use_doc_orientation_classify",
  64. "True",
  65. "--use_chart_recognition",
  66. "True",
  67. "--save_path",
  68. save_path_base,
  69. ]
  70. try:
  71. return_code, _, stderr = await _run_paddle_doc_parser(cmd)
  72. if return_code != 0:
  73. logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
  74. if stderr:
  75. logger.error(stderr)
  76. return None
  77. md_files = sorted(Path(save_path_base).rglob("*.md"))
  78. if not md_files:
  79. logger.error("[Paddle] 未找到Markdown文件")
  80. return None
  81. markdown_parts = []
  82. for md_file in md_files:
  83. async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
  84. markdown_parts.append(await f.read())
  85. final_content = "\n\n".join(markdown_parts)
  86. logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
  87. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  88. os.makedirs(local_md_dir, exist_ok=True)
  89. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  90. async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
  91. await f.write(final_content)
  92. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  93. async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
  94. await f.write(final_content)
  95. if embed_images:
  96. local_image_dir = os.path.join(output_dir, file_name, "images")
  97. os.makedirs(local_image_dir, exist_ok=True)
  98. for asset in Path(save_path_base).rglob("*"):
  99. if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
  100. shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
  101. json_data = None
  102. json_path = None
  103. if output_json:
  104. try:
  105. from ..parser.json_converter import parse_markdown_to_json
  106. json_output_dir = os.path.join(output_dir, file_name)
  107. json_data = parse_markdown_to_json(
  108. final_content,
  109. first_page_image=None,
  110. output_dir=json_output_dir,
  111. forced_document_type=forced_document_type,
  112. enable_paddleocr_fallback=True,
  113. input_file=input_file,
  114. )
  115. json_path = os.path.join(output_dir, f"{file_name}.json")
  116. async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
  117. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  118. except Exception as exc:
  119. logger.exception(f"[Paddle] JSON转换失败: {exc}")
  120. return {
  121. "markdown_file": output_md_path,
  122. "json_file": json_path,
  123. "json_data": json_data,
  124. "content": final_content,
  125. }
  126. finally:
  127. try:
  128. shutil.rmtree(temp_dir)
  129. except Exception as exc:
  130. logger.warning(f"[Paddle] 清理临时目录失败: {exc}")
  131. async def convert_to_markdown(
  132. input_file: str,
  133. output_dir: str = "./output",
  134. max_pages: int = 10,
  135. is_ocr: bool = False,
  136. formula_enable: bool = True,
  137. table_enable: bool = True,
  138. language: str = "ch",
  139. backend: str = "vlm-vllm-async-engine",
  140. url: str = "http://127.0.0.1:5282",
  141. embed_images: bool = True,
  142. output_json: bool = False,
  143. start_page_id: int = 0,
  144. end_page_id: int = 99999,
  145. parse_method: str = "auto",
  146. server_url: str = "string",
  147. response_format_zip: bool = True,
  148. return_middle_json: bool = False,
  149. return_model_output: bool = True,
  150. return_md: bool = True,
  151. return_images: bool = True, # 默认启用,以便PaddleOCR备用解析可以使用
  152. return_content_list: bool = False,
  153. forced_document_type: Optional[str] = None
  154. ):
  155. """将PDF/图片转换为Markdown的主要函数(使用新的API接口)"""
  156. if not os.path.exists(input_file):
  157. logger.error(f"输入文件不存在: {input_file}")
  158. return None
  159. # 生成文件名
  160. file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
  161. try:
  162. os.makedirs(output_dir, exist_ok=True)
  163. # 构建API请求URL
  164. api_url = f"{url}/file_parse"
  165. logger.info(f"调用API接口: {api_url}")
  166. # 创建临时目录用于解压zip文件
  167. temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_v2_{file_name}_")
  168. logger.info(f"创建临时目录: {temp_dir}")
  169. try:
  170. # 准备表单数据
  171. form_data = aiohttp.FormData()
  172. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  173. form_data.add_field('return_model_output', str(return_model_output).lower())
  174. form_data.add_field('return_md', str(return_md).lower())
  175. form_data.add_field('return_images', str(return_images).lower())
  176. form_data.add_field('end_page_id', str(end_page_id))
  177. form_data.add_field('parse_method', parse_method)
  178. form_data.add_field('start_page_id', str(start_page_id))
  179. form_data.add_field('lang_list', language)
  180. form_data.add_field('output_dir', './output')
  181. form_data.add_field('server_url', server_url)
  182. form_data.add_field('return_content_list', str(return_content_list).lower())
  183. form_data.add_field('backend', backend)
  184. form_data.add_field('table_enable', str(table_enable).lower())
  185. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  186. form_data.add_field('formula_enable', str(formula_enable).lower())
  187. # 打开文件并添加到表单数据(文件会在请求发送时读取)
  188. file_obj = open(input_file, 'rb')
  189. try:
  190. # 根据扩展名设置内容类型,默认使用application/octet-stream
  191. ext = (Path(input_file).suffix or "").lower()
  192. content_type = 'application/octet-stream'
  193. if ext == '.pdf':
  194. content_type = 'application/pdf'
  195. elif ext in {'.png'}:
  196. content_type = 'image/png'
  197. elif ext in {'.jpg', '.jpeg'}:
  198. content_type = 'image/jpeg'
  199. elif ext in {'.bmp'}:
  200. content_type = 'image/bmp'
  201. elif ext in {'.tif', '.tiff'}:
  202. content_type = 'image/tiff'
  203. elif ext in {'.webp'}:
  204. content_type = 'image/webp'
  205. # 不使用原始文件名,直接使用简单的固定命名,避免对端服务在构造输出路径时触发 “File name too long”
  206. # 从文件路径获取扩展名
  207. ext = Path(input_file).suffix or ".pdf"
  208. upload_name = f"file{ext}"
  209. form_data.add_field(
  210. 'files',
  211. file_obj,
  212. filename=upload_name,
  213. content_type=content_type
  214. )
  215. # 发送API请求(设置超时时间:总超时600秒,连接超时30秒,socket读取超时300秒)
  216. timeout = aiohttp.ClientTimeout(total=600, connect=30, sock_read=300)
  217. # 添加重试机制
  218. max_retries = 3
  219. retry_count = 0
  220. last_error = None
  221. while retry_count < max_retries:
  222. try:
  223. async with aiohttp.ClientSession(timeout=timeout) as session:
  224. if retry_count > 0:
  225. logger.warning(f"重试第 {retry_count} 次上传文件: {input_file}")
  226. else:
  227. logger.info(f"开始上传文件: {input_file}")
  228. async with session.post(api_url, data=form_data) as response:
  229. if response.status != 200:
  230. error_text = await response.text()
  231. logger.error(f"API请求失败,状态码: {response.status}, 错误: {error_text}")
  232. return None
  233. # 检查Content-Type是否为zip
  234. content_type = response.headers.get('Content-Type', '')
  235. if 'zip' not in content_type and 'application/zip' not in content_type:
  236. # 如果不是zip,尝试检查响应内容
  237. content_disposition = response.headers.get('Content-Disposition', '')
  238. if 'zip' not in content_disposition.lower():
  239. logger.warning(f"响应Content-Type可能不是zip: {content_type}")
  240. # 保存zip文件
  241. zip_path = os.path.join(temp_dir, f"{file_name}.zip")
  242. async with aiofiles.open(zip_path, 'wb') as f:
  243. async for chunk in response.content.iter_chunked(8192):
  244. await f.write(chunk)
  245. logger.info(f"Zip文件已保存: {zip_path}")
  246. # 成功,跳出重试循环
  247. break
  248. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  249. last_error = e
  250. retry_count += 1
  251. error_type = type(e).__name__
  252. logger.warning(f"API请求失败 ({error_type}): {e}, 重试 {retry_count}/{max_retries}")
  253. if retry_count < max_retries:
  254. # 等待一段时间后重试(指数退避)
  255. wait_time = 2 ** retry_count
  256. logger.info(f"等待 {wait_time} 秒后重试...")
  257. await asyncio.sleep(wait_time)
  258. # 重新创建 form_data(因为已经被消费了)
  259. file_obj.seek(0) # 重置文件指针
  260. form_data = aiohttp.FormData()
  261. form_data.add_field('return_middle_json', str(return_middle_json).lower())
  262. form_data.add_field('return_model_output', str(return_model_output).lower())
  263. form_data.add_field('return_md', str(return_md).lower())
  264. form_data.add_field('return_images', str(return_images).lower())
  265. form_data.add_field('end_page_id', str(end_page_id))
  266. form_data.add_field('parse_method', parse_method)
  267. form_data.add_field('start_page_id', str(start_page_id))
  268. form_data.add_field('lang_list', language)
  269. form_data.add_field('output_dir', './output')
  270. form_data.add_field('server_url', server_url)
  271. form_data.add_field('return_content_list', str(return_content_list).lower())
  272. form_data.add_field('backend', backend)
  273. form_data.add_field('table_enable', str(table_enable).lower())
  274. form_data.add_field('response_format_zip', str(response_format_zip).lower())
  275. form_data.add_field('formula_enable', str(formula_enable).lower())
  276. form_data.add_field('files', file_obj, filename=upload_name, content_type=content_type)
  277. else:
  278. logger.error(f"API请求失败,已达到最大重试次数 ({max_retries})")
  279. raise last_error
  280. finally:
  281. # 关闭文件对象
  282. file_obj.close()
  283. # 解压zip文件
  284. logger.info("开始解压zip文件...")
  285. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  286. zip_ref.extractall(temp_dir)
  287. # 查找md文件
  288. md_files = list(Path(temp_dir).rglob("*.md"))
  289. if not md_files:
  290. logger.error("在zip文件中未找到md文件")
  291. return None
  292. logger.info(f"找到 {len(md_files)} 个md文件")
  293. # 读取所有md文件并合并
  294. markdown_parts = []
  295. for md_file in sorted(md_files):
  296. logger.info(f"读取md文件: {md_file}")
  297. async with aiofiles.open(md_file, 'r', encoding='utf-8') as f:
  298. content = await f.read()
  299. markdown_parts.append(content)
  300. # 合并所有页面内容
  301. original_content = "\n\n".join(markdown_parts)
  302. logger.info(f"合并后的markdown长度: {len(original_content)} 字符")
  303. # 准备输出目录
  304. local_md_dir = os.path.join(output_dir, file_name, "markdown")
  305. os.makedirs(local_md_dir, exist_ok=True)
  306. # 处理图片嵌入(如果需要)
  307. final_content = original_content
  308. if embed_images:
  309. # 查找图片文件
  310. image_files = list(Path(temp_dir).rglob("*.png")) + list(Path(temp_dir).rglob("*.jpg")) + list(Path(temp_dir).rglob("*.jpeg"))
  311. if image_files:
  312. local_image_dir = os.path.join(output_dir, file_name, "images")
  313. os.makedirs(local_image_dir, exist_ok=True)
  314. # 复制图片到输出目录
  315. for img_file in image_files:
  316. dst_path = os.path.join(local_image_dir, img_file.name)
  317. shutil.copy2(img_file, dst_path)
  318. logger.debug(f"复制图片: {img_file} -> {dst_path}")
  319. # 保存Markdown文件
  320. md_path = os.path.join(local_md_dir, f"{file_name}.md")
  321. async with aiofiles.open(md_path, 'w', encoding='utf-8') as f:
  322. await f.write(final_content)
  323. logger.info(f"Markdown文件已保存: {md_path}")
  324. # 生成输出文件路径(在output_dir根目录下也保存一份)
  325. output_md_path = os.path.join(output_dir, f"{file_name}.md")
  326. async with aiofiles.open(output_md_path, 'w', encoding='utf-8') as f:
  327. await f.write(final_content)
  328. logger.info(f"转换完成: {output_md_path}")
  329. # JSON转换(如果需要)
  330. json_data = None
  331. json_path = None
  332. if output_json:
  333. try:
  334. logger.info("开始转换为JSON格式...")
  335. # 复用v1的json解析逻辑
  336. # 注意:v2版本不涉及MinerU和PaddleOCR的具体调用,只进行JSON解析
  337. # first_page_image设为None,因为v2版本不处理PDF图片
  338. from ..parser.json_converter import parse_markdown_to_json
  339. # 构建完整的输出目录路径,包含文件名的子目录
  340. json_output_dir = os.path.join(output_dir, file_name) if file_name else output_dir
  341. json_data = parse_markdown_to_json(
  342. original_content,
  343. first_page_image=None,
  344. output_dir=json_output_dir,
  345. forced_document_type=forced_document_type,
  346. enable_paddleocr_fallback=True,
  347. input_file=input_file,
  348. )
  349. json_path = os.path.join(output_dir, f"{file_name}.json")
  350. async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
  351. await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
  352. logger.info(f"JSON文件已保存: {json_path}")
  353. logger.info(f"文档类型: {json_data.get('document_type', 'unknown')}")
  354. except Exception as e:
  355. logger.exception(f"JSON转换失败: {e}")
  356. json_data = None
  357. return {
  358. 'markdown_file': output_md_path,
  359. 'json_file': json_path,
  360. 'json_data': json_data,
  361. 'content': final_content,
  362. 'original_content': original_content
  363. }
  364. finally:
  365. # 清理临时目录
  366. try:
  367. shutil.rmtree(temp_dir)
  368. logger.debug(f"已清理临时目录: {temp_dir}")
  369. except Exception as e:
  370. logger.warning(f"清理临时目录失败: {e}")
  371. except Exception as e:
  372. logger.exception(f"转换过程出错: {e}")
  373. return None