| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- # Copyright (c) Opendatalab. All rights reserved.
- """PDF转换主函数模块 v2 - 使用新的API接口"""
- import asyncio
- import json
- import os
- import time
- import zipfile
- import tempfile
- import shutil
- from pathlib import Path
- from typing import Optional, Sequence
- import aiohttp
- import aiofiles
- from PIL import Image
- from ..utils.logging_config import get_logger
- from ..utils.file_utils import safe_stem
- from ..utils.paddleocr_fallback import _get_paddleocr_subprocess_env
- from .config import PADDLE_DOC_PARSER_CMD, VL_REC_BACKEND, VL_REC_SERVER_URL
- logger = get_logger("pdf_converter_v2.processor")
- PADDLE_CMD = PADDLE_DOC_PARSER_CMD
- async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
- """异步执行 paddleocr doc_parser 命令"""
- logger.info(f"[Paddle] 执行命令: {' '.join(cmd)}")
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=_get_paddleocr_subprocess_env(),
- )
- stdout_bytes, stderr_bytes = await process.communicate()
- stdout = stdout_bytes.decode("utf-8", errors="ignore")
- stderr = stderr_bytes.decode("utf-8", errors="ignore")
- if stdout:
- logger.debug(f"[Paddle] stdout: {stdout[:2000]}")
- if stderr:
- logger.debug(f"[Paddle] stderr: {stderr[:2000]}")
- return process.returncode, stdout, stderr
- async def _convert_with_paddle(
- input_file: str,
- output_dir: str,
- embed_images: bool,
- output_json: bool,
- forced_document_type: Optional[str],
- ):
- """针对工况附件使用 PaddleOCR doc_parser 直接转换"""
- if not os.path.exists(input_file):
- logger.error(f"[Paddle] 输入文件不存在: {input_file}")
- return None
-
- file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
- os.makedirs(output_dir, exist_ok=True)
-
- temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
- logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
- save_path_base = os.path.join(temp_dir, Path(input_file).stem)
- os.makedirs(save_path_base, exist_ok=True)
-
- cmd = [
- PADDLE_CMD,
- "doc_parser",
- "-i",
- input_file,
- "--precision",
- "fp32",
- "--use_doc_unwarping",
- "False",
- "--use_doc_orientation_classify",
- "True",
- "--use_chart_recognition",
- "True",
- "--save_path",
- save_path_base,
- ]
-
- # 添加 VL 识别后端配置(如果已配置)
- if VL_REC_BACKEND:
- cmd.extend(["--vl_rec_backend", VL_REC_BACKEND])
- if VL_REC_SERVER_URL:
- cmd.extend(["--vl_rec_server_url", VL_REC_SERVER_URL])
-
- try:
- return_code, _, stderr = await _run_paddle_doc_parser(cmd)
- if return_code != 0:
- logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
- if stderr:
- logger.error(stderr)
- return None
-
- md_files = sorted(Path(save_path_base).rglob("*.md"))
- if not md_files:
- logger.error("[Paddle] 未找到Markdown文件")
- return None
-
- markdown_parts = []
- for md_file in md_files:
- async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
- markdown_parts.append(await f.read())
- final_content = "\n\n".join(markdown_parts)
- logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
-
- local_md_dir = os.path.join(output_dir, file_name, "markdown")
- os.makedirs(local_md_dir, exist_ok=True)
- md_path = os.path.join(local_md_dir, f"{file_name}.md")
- async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
- await f.write(final_content)
-
- output_md_path = os.path.join(output_dir, f"{file_name}.md")
- async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
- await f.write(final_content)
-
- if embed_images:
- local_image_dir = os.path.join(output_dir, file_name, "images")
- os.makedirs(local_image_dir, exist_ok=True)
- for asset in Path(save_path_base).rglob("*"):
- if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
- shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
-
- json_data = None
- json_path = None
- if output_json:
- try:
- from ..parser.json_converter import parse_markdown_to_json
- json_output_dir = os.path.join(output_dir, file_name)
- json_data = parse_markdown_to_json(
- final_content,
- first_page_image=None,
- output_dir=json_output_dir,
- forced_document_type=forced_document_type,
- enable_paddleocr_fallback=True,
- input_file=input_file,
- )
- json_path = os.path.join(output_dir, f"{file_name}.json")
- async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
- await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
- except Exception as exc:
- logger.exception(f"[Paddle] JSON转换失败: {exc}")
-
- return {
- "markdown_file": output_md_path,
- "json_file": json_path,
- "json_data": json_data,
- "content": final_content,
- }
- finally:
- try:
- shutil.rmtree(temp_dir)
- except Exception as exc:
- logger.warning(f"[Paddle] 清理临时目录失败: {exc}")
- async def convert_to_markdown(
- input_file: str,
- output_dir: str = "./output",
- max_pages: int = 10,
- is_ocr: bool = False,
- formula_enable: bool = True,
- table_enable: bool = True,
- language: str = "ch",
- backend: str = "vlm-vllm-async-engine",
- url: str = "http://127.0.0.1:5282",
- embed_images: bool = True,
- output_json: bool = False,
- start_page_id: int = 0,
- end_page_id: int = 99999,
- parse_method: str = "auto",
- server_url: str = "string",
- response_format_zip: bool = True,
- return_middle_json: bool = False,
- return_model_output: bool = True,
- return_md: bool = True,
- return_images: bool = True, # 默认启用,以便PaddleOCR备用解析可以使用
- return_content_list: bool = False,
- forced_document_type: Optional[str] = None
- ):
- """将PDF/图片转换为Markdown的主要函数(使用新的API接口)"""
-
- if not os.path.exists(input_file):
- logger.error(f"输入文件不存在: {input_file}")
- return None
- # 生成文件名
- file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
-
- try:
- os.makedirs(output_dir, exist_ok=True)
-
- # 构建API请求URL
- api_url = f"{url}/file_parse"
- logger.info(f"调用API接口: {api_url}")
-
- # 创建临时目录用于解压zip文件
- temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_v2_{file_name}_")
- logger.info(f"创建临时目录: {temp_dir}")
-
- try:
- # 准备表单数据
- form_data = aiohttp.FormData()
- form_data.add_field('return_middle_json', str(return_middle_json).lower())
- form_data.add_field('return_model_output', str(return_model_output).lower())
- form_data.add_field('return_md', str(return_md).lower())
- form_data.add_field('return_images', str(return_images).lower())
- form_data.add_field('end_page_id', str(end_page_id))
- form_data.add_field('parse_method', parse_method)
- form_data.add_field('start_page_id', str(start_page_id))
- form_data.add_field('lang_list', language)
- form_data.add_field('output_dir', './output')
- form_data.add_field('server_url', server_url)
- form_data.add_field('return_content_list', str(return_content_list).lower())
- form_data.add_field('backend', backend)
- form_data.add_field('table_enable', str(table_enable).lower())
- form_data.add_field('response_format_zip', str(response_format_zip).lower())
- form_data.add_field('formula_enable', str(formula_enable).lower())
-
- # 格式化并记录调用参数(中英文对照)
- params_log = {
- "return_middle_json (返回中间JSON)": str(return_middle_json).lower(),
- "return_model_output (返回模型输出)": str(return_model_output).lower(),
- "return_md (返回Markdown)": str(return_md).lower(),
- "return_images (返回图片)": str(return_images).lower(),
- "end_page_id (结束页码)": str(end_page_id),
- "parse_method (解析方法)": parse_method,
- "start_page_id (起始页码)": str(start_page_id),
- "lang_list (语言列表)": language,
- "output_dir (输出目录)": "./output",
- "server_url (服务器URL)": server_url,
- "return_content_list (返回内容列表)": str(return_content_list).lower(),
- "backend (处理后端)": backend,
- "table_enable (启用表格识别)": str(table_enable).lower(),
- "response_format_zip (响应格式ZIP)": str(response_format_zip).lower(),
- "formula_enable (启用公式识别)": str(formula_enable).lower(),
- }
- logger.info(f"MinerU API 调用参数:\n{json.dumps(params_log, indent=4, ensure_ascii=False)}")
-
- # 打开文件并添加到表单数据(文件会在请求发送时读取)
- file_obj = open(input_file, 'rb')
- try:
- # 根据扩展名设置内容类型,默认使用application/octet-stream
- ext = (Path(input_file).suffix or "").lower()
- content_type = 'application/octet-stream'
- if ext == '.pdf':
- content_type = 'application/pdf'
- elif ext in {'.png'}:
- content_type = 'image/png'
- elif ext in {'.jpg', '.jpeg'}:
- content_type = 'image/jpeg'
- elif ext in {'.bmp'}:
- content_type = 'image/bmp'
- elif ext in {'.tif', '.tiff'}:
- content_type = 'image/tiff'
- elif ext in {'.webp'}:
- content_type = 'image/webp'
- # 不使用原始文件名,直接使用简单的固定命名,避免对端服务在构造输出路径时触发 “File name too long”
- # 从文件路径获取扩展名
- ext = Path(input_file).suffix or ".pdf"
- upload_name = f"file{ext}"
- form_data.add_field(
- 'files',
- file_obj,
- filename=upload_name,
- content_type=content_type
- )
-
- # 发送API请求(设置超时时间:总超时600秒,连接超时30秒,socket读取超时300秒)
- timeout = aiohttp.ClientTimeout(total=600, connect=30, sock_read=300)
-
- # 添加重试机制
- max_retries = 3
- retry_count = 0
- last_error = None
-
- while retry_count < max_retries:
- try:
- async with aiohttp.ClientSession(timeout=timeout) as session:
- if retry_count > 0:
- logger.warning(f"重试第 {retry_count} 次上传文件: {input_file}")
- else:
- logger.info(f"开始上传文件: {input_file}")
-
- async with session.post(api_url, data=form_data) as response:
- if response.status != 200:
- error_text = await response.text()
- logger.error(f"API请求失败,状态码: {response.status}, 错误: {error_text}")
- return None
-
- # 检查Content-Type是否为zip
- content_type = response.headers.get('Content-Type', '')
- if 'zip' not in content_type and 'application/zip' not in content_type:
- # 如果不是zip,尝试检查响应内容
- content_disposition = response.headers.get('Content-Disposition', '')
- if 'zip' not in content_disposition.lower():
- logger.warning(f"响应Content-Type可能不是zip: {content_type}")
-
- # 保存zip文件
- zip_path = os.path.join(temp_dir, f"{file_name}.zip")
- async with aiofiles.open(zip_path, 'wb') as f:
- async for chunk in response.content.iter_chunked(8192):
- await f.write(chunk)
-
- logger.info(f"Zip文件已保存: {zip_path}")
- # 成功,跳出重试循环
- break
-
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- last_error = e
- retry_count += 1
- error_type = type(e).__name__
- logger.warning(f"API请求失败 ({error_type}): {e}, 重试 {retry_count}/{max_retries}")
-
- if retry_count < max_retries:
- # 等待一段时间后重试(指数退避)
- wait_time = 2 ** retry_count
- logger.info(f"等待 {wait_time} 秒后重试...")
- await asyncio.sleep(wait_time)
-
- # 重新创建 form_data(因为已经被消费了)
- file_obj.seek(0) # 重置文件指针
- form_data = aiohttp.FormData()
- form_data.add_field('return_middle_json', str(return_middle_json).lower())
- form_data.add_field('return_model_output', str(return_model_output).lower())
- form_data.add_field('return_md', str(return_md).lower())
- form_data.add_field('return_images', str(return_images).lower())
- form_data.add_field('end_page_id', str(end_page_id))
- form_data.add_field('parse_method', parse_method)
- form_data.add_field('start_page_id', str(start_page_id))
- form_data.add_field('lang_list', language)
- form_data.add_field('output_dir', './output')
- form_data.add_field('server_url', server_url)
- form_data.add_field('return_content_list', str(return_content_list).lower())
- form_data.add_field('backend', backend)
- form_data.add_field('table_enable', str(table_enable).lower())
- form_data.add_field('response_format_zip', str(response_format_zip).lower())
- form_data.add_field('formula_enable', str(formula_enable).lower())
- form_data.add_field('files', file_obj, filename=upload_name, content_type=content_type)
- else:
- logger.error(f"API请求失败,已达到最大重试次数 ({max_retries})")
- raise last_error
-
- finally:
- # 关闭文件对象
- file_obj.close()
-
- # 解压zip文件
- logger.info("开始解压zip文件...")
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
-
- # 查找md文件
- md_files = list(Path(temp_dir).rglob("*.md"))
- if not md_files:
- logger.error("在zip文件中未找到md文件")
- return None
-
- logger.info(f"找到 {len(md_files)} 个md文件")
-
- # 读取所有md文件并合并
- markdown_parts = []
- for md_file in sorted(md_files):
- logger.info(f"读取md文件: {md_file}")
- async with aiofiles.open(md_file, 'r', encoding='utf-8') as f:
- content = await f.read()
- markdown_parts.append(content)
-
- # 合并所有页面内容
- original_content = "\n\n".join(markdown_parts)
- logger.info(f"合并后的markdown长度: {len(original_content)} 字符")
-
- # 准备输出目录
- local_md_dir = os.path.join(output_dir, file_name, "markdown")
- os.makedirs(local_md_dir, exist_ok=True)
-
- # 处理图片嵌入(如果需要)
- final_content = original_content
- if embed_images:
- # 查找图片文件
- image_files = list(Path(temp_dir).rglob("*.png")) + list(Path(temp_dir).rglob("*.jpg")) + list(Path(temp_dir).rglob("*.jpeg"))
- if image_files:
- local_image_dir = os.path.join(output_dir, file_name, "images")
- os.makedirs(local_image_dir, exist_ok=True)
-
- # 复制图片到输出目录
- for img_file in image_files:
- dst_path = os.path.join(local_image_dir, img_file.name)
- shutil.copy2(img_file, dst_path)
- logger.debug(f"复制图片: {img_file} -> {dst_path}")
-
- # 保存Markdown文件
- md_path = os.path.join(local_md_dir, f"{file_name}.md")
- async with aiofiles.open(md_path, 'w', encoding='utf-8') as f:
- await f.write(final_content)
- logger.info(f"Markdown文件已保存: {md_path}")
-
- # 生成输出文件路径(在output_dir根目录下也保存一份)
- output_md_path = os.path.join(output_dir, f"{file_name}.md")
- async with aiofiles.open(output_md_path, 'w', encoding='utf-8') as f:
- await f.write(final_content)
-
- logger.info(f"转换完成: {output_md_path}")
-
- # JSON转换(如果需要)
- json_data = None
- json_path = None
- if output_json:
- try:
- logger.info("开始转换为JSON格式...")
- # 复用v1的json解析逻辑
- # 注意:v2版本不涉及MinerU和PaddleOCR的具体调用,只进行JSON解析
- # first_page_image设为None,因为v2版本不处理PDF图片
- from ..parser.json_converter import parse_markdown_to_json
- # 构建完整的输出目录路径,包含文件名的子目录
- json_output_dir = os.path.join(output_dir, file_name) if file_name else output_dir
- json_data = parse_markdown_to_json(
- original_content,
- first_page_image=None,
- output_dir=json_output_dir,
- forced_document_type=forced_document_type,
- enable_paddleocr_fallback=True,
- input_file=input_file,
- )
- json_path = os.path.join(output_dir, f"{file_name}.json")
- async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
- await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
- logger.info(f"JSON文件已保存: {json_path}")
- logger.info(f"文档类型: {json_data.get('document_type', 'unknown')}")
- except Exception as e:
- logger.exception(f"JSON转换失败: {e}")
- json_data = None
-
- return {
- 'markdown_file': output_md_path,
- 'json_file': json_path,
- 'json_data': json_data,
- 'content': final_content,
- 'original_content': original_content
- }
-
- finally:
- # 清理临时目录
- try:
- shutil.rmtree(temp_dir)
- logger.debug(f"已清理临时目录: {temp_dir}")
- except Exception as e:
- logger.warning(f"清理临时目录失败: {e}")
-
- except Exception as e:
- logger.exception(f"转换过程出错: {e}")
- return None
|