| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683 |
- # Copyright (c) Opendatalab. All rights reserved.
- """PDF转换主函数模块 v2 - 使用新的API接口"""
- import asyncio
- import json
- import os
- import time
- import zipfile
- import tempfile
- import shutil
- from pathlib import Path
- from typing import Optional, Sequence
- import aiohttp
- import aiofiles
- from PIL import Image
- from ..utils.logging_config import get_logger
- from ..utils.file_utils import safe_stem
- from ..utils.pdf_splitter import get_pdf_page_count, split_pdf_by_pages
- from ..utils.mineru_url_selector import get_next_mineru_api_url, get_mineru_api_url_list
- from ..utils.paddleocr_fallback import (
- get_paddle_ocr_devices,
- get_paddle_ocr_device_args_for_index,
- _paddle_ocr_device_args,
- _get_paddleocr_subprocess_env,
- )
- from ..config import PADDLE_DOC_PARSER_CMD, VL_REC_BACKEND, VL_REC_SERVER_URL
- logger = get_logger("pdf_converter_v2.processor")
- async def _run_paddle_doc_parser(cmd: Sequence[str]) -> tuple[int, str, str]:
- """异步执行 paddleocr doc_parser 命令"""
- logger.info(f"[Paddle] 执行命令: {' '.join(cmd)}")
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=_get_paddleocr_subprocess_env(),
- )
- stdout_bytes, stderr_bytes = await process.communicate()
- stdout = stdout_bytes.decode("utf-8", errors="ignore")
- stderr = stderr_bytes.decode("utf-8", errors="ignore")
- if stdout:
- logger.debug(f"[Paddle] stdout: {stdout[:2000]}")
- if stderr:
- logger.debug(f"[Paddle] stderr: {stderr[:2000]}")
- return process.returncode, stdout, stderr
- def _paddle_base_cmd(input_path: str, save_path_base: str, device_args: list) -> list:
- """构建 PaddleOCR doc_parser 命令(含设备参数)。"""
- cmd = [
- PADDLE_CMD,
- "doc_parser",
- "-i",
- input_path,
- "--precision",
- "fp32",
- "--use_doc_unwarping",
- "False",
- "--use_doc_orientation_classify",
- "True",
- "--use_chart_recognition",
- "True",
- "--save_path",
- save_path_base,
- ] + device_args
-
- # 添加 VL 识别后端配置(如果已配置)
- if VL_REC_BACKEND:
- cmd.extend(["--vl_rec_backend", VL_REC_BACKEND])
- if VL_REC_SERVER_URL:
- cmd.extend(["--vl_rec_server_url", VL_REC_SERVER_URL])
-
- return cmd
- async def _convert_with_paddle(
- input_file: str,
- output_dir: str,
- embed_images: bool,
- output_json: bool,
- forced_document_type: Optional[str],
- ):
- """针对工况附件使用 PaddleOCR doc_parser 直接转换;多卡时按页拆分并行跑满所有卡。"""
- if not os.path.exists(input_file):
- logger.error(f"[Paddle] 输入文件不存在: {input_file}")
- return None
- file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
- os.makedirs(output_dir, exist_ok=True)
- temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_paddle_{file_name}_")
- logger.info(f"[Paddle] 创建临时目录: {temp_dir}")
- devices = get_paddle_ocr_devices()
- ext = (Path(input_file).suffix or "").lower()
- page_count = get_pdf_page_count(input_file) if ext == ".pdf" else 0
- use_multi_card = len(devices) > 1 and ext == ".pdf" and page_count > 1
- try:
- if use_multi_card:
- # 多卡:按页拆成 N 段,每段用一张卡并行 doc_parser,再合并
- chunk_size = (page_count + len(devices) - 1) // len(devices)
- chunks_dir = os.path.join(temp_dir, "chunks")
- os.makedirs(chunks_dir, exist_ok=True)
- chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
- if not chunk_paths:
- logger.error("[Paddle] 多卡拆分 PDF 失败")
- return None
- logger.info(f"[Paddle] PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行使用 {len(devices)} 张卡")
- tasks = []
- for i, chunk_path in enumerate(chunk_paths):
- save_path_base_i = os.path.join(temp_dir, f"out_{i}", Path(chunk_path).stem)
- os.makedirs(save_path_base_i, exist_ok=True)
- cmd = _paddle_base_cmd(
- chunk_path,
- save_path_base_i,
- get_paddle_ocr_device_args_for_index(i),
- )
- tasks.append(_run_paddle_doc_parser(cmd))
- results = await asyncio.gather(*tasks, return_exceptions=True)
- markdown_parts = []
- all_save_bases = [os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem) for i in range(len(chunk_paths))]
- for i, res in enumerate(results):
- if isinstance(res, Exception):
- logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 异常: {res}")
- continue
- ret_code, _, stderr = res
- if ret_code != 0:
- logger.warning(f"[Paddle] 第 {i + 1} 段 doc_parser 失败: {stderr}")
- continue
- base = all_save_bases[i]
- md_files = sorted(Path(base).rglob("*.md"))
- for md_file in md_files:
- async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
- markdown_parts.append(await f.read())
- final_content = "\n\n".join(markdown_parts) if markdown_parts else ""
- else:
- # 单卡或非 PDF:一次 doc_parser
- save_path_base = os.path.join(temp_dir, Path(input_file).stem)
- os.makedirs(save_path_base, exist_ok=True)
- cmd = _paddle_base_cmd(input_file, save_path_base, _paddle_ocr_device_args())
- return_code, _, stderr = await _run_paddle_doc_parser(cmd)
- if return_code != 0:
- logger.error(f"[Paddle] doc_parser 执行失败 code={return_code}")
- if stderr:
- logger.error(stderr)
- return None
- md_files = sorted(Path(save_path_base).rglob("*.md"))
- if not md_files:
- logger.error("[Paddle] 未找到Markdown文件")
- return None
- markdown_parts = []
- for md_file in md_files:
- async with aiofiles.open(md_file, "r", encoding="utf-8") as f:
- markdown_parts.append(await f.read())
- final_content = "\n\n".join(markdown_parts)
- if not final_content:
- logger.error("[Paddle] 合并后无内容")
- return None
- logger.info(f"[Paddle] 合并后的markdown长度: {len(final_content)}")
- local_md_dir = os.path.join(output_dir, file_name, "markdown")
- os.makedirs(local_md_dir, exist_ok=True)
- md_path = os.path.join(local_md_dir, f"{file_name}.md")
- async with aiofiles.open(md_path, "w", encoding="utf-8") as f:
- await f.write(final_content)
- output_md_path = os.path.join(output_dir, f"{file_name}.md")
- async with aiofiles.open(output_md_path, "w", encoding="utf-8") as f:
- await f.write(final_content)
- if embed_images:
- local_image_dir = os.path.join(output_dir, file_name, "images")
- os.makedirs(local_image_dir, exist_ok=True)
- if use_multi_card:
- for i in range(len(chunk_paths)):
- base = os.path.join(temp_dir, f"out_{i}", Path(chunk_paths[i]).stem)
- for asset in Path(base).rglob("*"):
- if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
- shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
- else:
- base = os.path.join(temp_dir, Path(input_file).stem)
- for asset in Path(base).rglob("*"):
- if asset.is_file() and asset.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}:
- shutil.copy2(asset, os.path.join(local_image_dir, asset.name))
- json_data = None
- json_path = None
- if output_json:
- try:
- from ..parser.json_converter import parse_markdown_to_json
- json_output_dir = os.path.join(output_dir, file_name)
- json_data = parse_markdown_to_json(
- final_content,
- first_page_image=None,
- output_dir=json_output_dir,
- forced_document_type=forced_document_type,
- enable_paddleocr_fallback=True,
- input_file=input_file,
- )
- json_path = os.path.join(output_dir, f"{file_name}.json")
- async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
- await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
- except Exception as exc:
- logger.exception(f"[Paddle] JSON转换失败: {exc}")
- return {
- "markdown_file": output_md_path,
- "json_file": json_path,
- "json_data": json_data,
- "content": final_content,
- }
- finally:
- try:
- shutil.rmtree(temp_dir)
- except Exception as exc:
- logger.warning(f"[Paddle] 清理临时目录失败: {exc}")
- async def convert_to_markdown(
- input_file: str,
- output_dir: str = "./output",
- max_pages: int = 10,
- is_ocr: bool = False,
- formula_enable: bool = True,
- table_enable: bool = True,
- language: str = "ch",
- backend: str = "vlm-vllm-async-engine",
- url: str = "http://127.0.0.1:5282",
- embed_images: bool = True,
- output_json: bool = False,
- start_page_id: int = 0,
- end_page_id: int = 99999,
- parse_method: str = "auto",
- server_url: str = "string",
- response_format_zip: bool = True,
- return_middle_json: bool = False,
- return_model_output: bool = True,
- return_md: bool = True,
- return_images: bool = True, # 默认启用,以便PaddleOCR备用解析可以使用
- return_content_list: bool = False,
- forced_document_type: Optional[str] = None
- ):
- """将PDF/图片转换为Markdown的主要函数(使用新的API接口)"""
-
- if not os.path.exists(input_file):
- logger.error(f"输入文件不存在: {input_file}")
- return None
- url = url or get_next_mineru_api_url()
- # 生成文件名
- file_name = f'{safe_stem(Path(input_file).stem)}_{time.strftime("%y%m%d_%H%M%S")}'
-
- try:
- os.makedirs(output_dir, exist_ok=True)
-
- # 构建API请求URL
- api_url = f"{url}/file_parse"
- logger.info(f"调用API接口: {api_url}")
-
- # 创建临时目录用于解压zip文件
- temp_dir = tempfile.mkdtemp(prefix=f"pdf_converter_v2_{file_name}_")
- logger.info(f"创建临时目录: {temp_dir}")
-
- try:
- # 准备表单数据
- form_data = aiohttp.FormData()
- form_data.add_field('return_middle_json', str(return_middle_json).lower())
- form_data.add_field('return_model_output', str(return_model_output).lower())
- form_data.add_field('return_md', str(return_md).lower())
- form_data.add_field('return_images', str(return_images).lower())
- form_data.add_field('end_page_id', str(end_page_id))
- form_data.add_field('parse_method', parse_method)
- form_data.add_field('start_page_id', str(start_page_id))
- form_data.add_field('lang_list', language)
- form_data.add_field('output_dir', './output')
- form_data.add_field('server_url', server_url)
- form_data.add_field('return_content_list', str(return_content_list).lower())
- form_data.add_field('backend', backend)
- form_data.add_field('table_enable', str(table_enable).lower())
- form_data.add_field('response_format_zip', str(response_format_zip).lower())
- form_data.add_field('formula_enable', str(formula_enable).lower())
-
- # 格式化并记录调用参数(中英文对照)
- params_log = {
- "return_middle_json (返回中间JSON)": str(return_middle_json).lower(),
- "return_model_output (返回模型输出)": str(return_model_output).lower(),
- "return_md (返回Markdown)": str(return_md).lower(),
- "return_images (返回图片)": str(return_images).lower(),
- "end_page_id (结束页码)": str(end_page_id),
- "parse_method (解析方法)": parse_method,
- "start_page_id (起始页码)": str(start_page_id),
- "lang_list (语言列表)": language,
- "output_dir (输出目录)": "./output",
- "server_url (服务器URL)": server_url,
- "return_content_list (返回内容列表)": str(return_content_list).lower(),
- "backend (处理后端)": backend,
- "table_enable (启用表格识别)": str(table_enable).lower(),
- "response_format_zip (响应格式ZIP)": str(response_format_zip).lower(),
- "formula_enable (启用公式识别)": str(formula_enable).lower(),
- }
- logger.info(f"MinerU API 调用参数:\n{json.dumps(params_log, indent=4, ensure_ascii=False)}")
-
- # 打开文件并添加到表单数据(文件会在请求发送时读取)
- file_obj = open(input_file, 'rb')
- try:
- # 根据扩展名设置内容类型,默认使用application/octet-stream
- ext = (Path(input_file).suffix or "").lower()
- content_type = 'application/octet-stream'
- if ext == '.pdf':
- content_type = 'application/pdf'
- elif ext in {'.png'}:
- content_type = 'image/png'
- elif ext in {'.jpg', '.jpeg'}:
- content_type = 'image/jpeg'
- elif ext in {'.bmp'}:
- content_type = 'image/bmp'
- elif ext in {'.tif', '.tiff'}:
- content_type = 'image/tiff'
- elif ext in {'.webp'}:
- content_type = 'image/webp'
- # 不使用原始文件名,直接使用简单的固定命名,避免对端服务在构造输出路径时触发 “File name too long”
- # 从文件路径获取扩展名
- ext = Path(input_file).suffix or ".pdf"
- upload_name = f"file{ext}"
- form_data.add_field(
- 'files',
- file_obj,
- filename=upload_name,
- content_type=content_type
- )
-
- # 发送API请求(设置超时时间:总超时600秒,连接超时30秒,socket读取超时300秒)
- timeout = aiohttp.ClientTimeout(total=600, connect=30, sock_read=300)
-
- # 添加重试机制
- max_retries = 3
- retry_count = 0
- last_error = None
-
- while retry_count < max_retries:
- try:
- async with aiohttp.ClientSession(timeout=timeout) as session:
- if retry_count > 0:
- logger.warning(f"重试第 {retry_count} 次上传文件: {input_file}")
- else:
- logger.info(f"开始上传文件: {input_file}")
-
- async with session.post(api_url, data=form_data) as response:
- if response.status != 200:
- error_text = await response.text()
- logger.error(f"API请求失败,状态码: {response.status}, 错误: {error_text}")
- return None
-
- # 检查Content-Type是否为zip
- content_type = response.headers.get('Content-Type', '')
- if 'zip' not in content_type and 'application/zip' not in content_type:
- # 如果不是zip,尝试检查响应内容
- content_disposition = response.headers.get('Content-Disposition', '')
- if 'zip' not in content_disposition.lower():
- logger.warning(f"响应Content-Type可能不是zip: {content_type}")
-
- # 保存zip文件
- zip_path = os.path.join(temp_dir, f"{file_name}.zip")
- async with aiofiles.open(zip_path, 'wb') as f:
- async for chunk in response.content.iter_chunked(8192):
- await f.write(chunk)
-
- logger.info(f"Zip文件已保存: {zip_path}")
- # 成功,跳出重试循环
- break
-
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- last_error = e
- retry_count += 1
- error_type = type(e).__name__
- logger.warning(f"API请求失败 ({error_type}): {e}, 重试 {retry_count}/{max_retries}")
-
- if retry_count < max_retries:
- # 等待一段时间后重试(指数退避)
- wait_time = 2 ** retry_count
- logger.info(f"等待 {wait_time} 秒后重试...")
- await asyncio.sleep(wait_time)
-
- # 重试时重新打开文件(aiohttp 可能已关闭原 handle,seek(0) 会报 seek of closed file)
- try:
- file_obj.close()
- except Exception:
- pass
- file_obj = open(input_file, 'rb')
- form_data = aiohttp.FormData()
- form_data.add_field('return_middle_json', str(return_middle_json).lower())
- form_data.add_field('return_model_output', str(return_model_output).lower())
- form_data.add_field('return_md', str(return_md).lower())
- form_data.add_field('return_images', str(return_images).lower())
- form_data.add_field('end_page_id', str(end_page_id))
- form_data.add_field('parse_method', parse_method)
- form_data.add_field('start_page_id', str(start_page_id))
- form_data.add_field('lang_list', language)
- form_data.add_field('output_dir', './output')
- form_data.add_field('server_url', server_url)
- form_data.add_field('return_content_list', str(return_content_list).lower())
- form_data.add_field('backend', backend)
- form_data.add_field('table_enable', str(table_enable).lower())
- form_data.add_field('response_format_zip', str(response_format_zip).lower())
- form_data.add_field('formula_enable', str(formula_enable).lower())
- form_data.add_field('files', file_obj, filename=upload_name, content_type=content_type)
- else:
- logger.error(f"API请求失败,已达到最大重试次数 ({max_retries})")
- raise last_error
-
- finally:
- # 关闭文件对象
- file_obj.close()
-
- # 解压zip文件
- logger.info("开始解压zip文件...")
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
-
- # 查找md文件
- md_files = list(Path(temp_dir).rglob("*.md"))
- if not md_files:
- logger.error("在zip文件中未找到md文件")
- return None
-
- logger.info(f"找到 {len(md_files)} 个md文件")
-
- # 读取所有md文件并合并
- markdown_parts = []
- for md_file in sorted(md_files):
- logger.info(f"读取md文件: {md_file}")
- async with aiofiles.open(md_file, 'r', encoding='utf-8') as f:
- content = await f.read()
- markdown_parts.append(content)
-
- # 合并所有页面内容
- original_content = "\n\n".join(markdown_parts)
- logger.info(f"合并后的markdown长度: {len(original_content)} 字符")
-
- # 准备输出目录
- local_md_dir = os.path.join(output_dir, file_name, "markdown")
- os.makedirs(local_md_dir, exist_ok=True)
-
- # 处理图片嵌入(如果需要)
- final_content = original_content
- if embed_images:
- # 查找图片文件
- image_files = list(Path(temp_dir).rglob("*.png")) + list(Path(temp_dir).rglob("*.jpg")) + list(Path(temp_dir).rglob("*.jpeg"))
- if image_files:
- local_image_dir = os.path.join(output_dir, file_name, "images")
- os.makedirs(local_image_dir, exist_ok=True)
-
- # 复制图片到输出目录
- for img_file in image_files:
- dst_path = os.path.join(local_image_dir, img_file.name)
- shutil.copy2(img_file, dst_path)
- logger.debug(f"复制图片: {img_file} -> {dst_path}")
-
- # 保存Markdown文件
- md_path = os.path.join(local_md_dir, f"{file_name}.md")
- async with aiofiles.open(md_path, 'w', encoding='utf-8') as f:
- await f.write(final_content)
- logger.info(f"Markdown文件已保存: {md_path}")
-
- # 生成输出文件路径(在output_dir根目录下也保存一份)
- output_md_path = os.path.join(output_dir, f"{file_name}.md")
- async with aiofiles.open(output_md_path, 'w', encoding='utf-8') as f:
- await f.write(final_content)
-
- logger.info(f"转换完成: {output_md_path}")
-
- # JSON转换(如果需要)
- json_data = None
- json_path = None
- if output_json:
- try:
- logger.info("开始转换为JSON格式...")
- # 复用v1的json解析逻辑
- # 注意:v2版本不涉及MinerU和PaddleOCR的具体调用,只进行JSON解析
- # first_page_image设为None,因为v2版本不处理PDF图片
- from ..parser.json_converter import parse_markdown_to_json
- # 构建完整的输出目录路径,包含文件名的子目录
- json_output_dir = os.path.join(output_dir, file_name) if file_name else output_dir
- json_data = parse_markdown_to_json(
- original_content,
- first_page_image=None,
- output_dir=json_output_dir,
- forced_document_type=forced_document_type,
- enable_paddleocr_fallback=True,
- input_file=input_file,
- )
- json_path = os.path.join(output_dir, f"{file_name}.json")
- async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
- await f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
- logger.info(f"JSON文件已保存: {json_path}")
- logger.info(f"文档类型: {json_data.get('document_type', 'unknown')}")
- except Exception as e:
- logger.exception(f"JSON转换失败: {e}")
- json_data = None
-
- return {
- 'markdown_file': output_md_path,
- 'json_file': json_path,
- 'json_data': json_data,
- 'content': final_content,
- 'original_content': original_content
- }
-
- finally:
- # 清理临时目录
- try:
- shutil.rmtree(temp_dir)
- logger.debug(f"已清理临时目录: {temp_dir}")
- except Exception as e:
- logger.warning(f"清理临时目录失败: {e}")
-
- except Exception as e:
- logger.exception(f"转换过程出错: {e}")
- return None
- # PDF 超过此页数时按段切割后分别转换再合并,降低单次请求内存(避免 MinerU OOM)
- PDF_CHUNK_PAGES = 50
- async def convert_pdf_to_markdown_only(
- input_file: str,
- output_dir: str,
- backend: str = "mineru",
- url: Optional[str] = None,
- max_pages: int = 99999,
- formula_enable: bool = True,
- table_enable: bool = True,
- language: str = "ch",
- return_images: bool = False,
- ) -> Optional[dict]:
- """
- 仅将 PDF/图片 转为 Markdown 文本,不解析 JSON。
- 大 PDF(>50 页)会先按 50 页切割,分段转换后合并 MD,降低单次请求内存。
- :param return_images: 是否同时拉取并保存图片(MinerU 的 return_images + 本地 embed_images)
- :return: {"markdown": str, "filename": str} 或 None
- """
- if not os.path.exists(input_file):
- logger.error(f"输入文件不存在: {input_file}")
- return None
- ext = (Path(input_file).suffix or "").lower()
- url = url or get_next_mineru_api_url()
- # 仅对 PDF 做按页切割;图片或页数不足则单次转换
- if ext == ".pdf":
- page_count = get_pdf_page_count(input_file)
- if page_count <= 0:
- logger.error(f"无法获取 PDF 页数: {input_file}")
- return None
- # MinerU 多卡:按页拆成 N 段,每段发到不同 API 实例并行转换后合并(单任务用满所有卡)
- url_list = get_mineru_api_url_list()
- if backend != "paddle" and len(url_list) > 1 and page_count > 1:
- chunk_size = (page_count + len(url_list) - 1) // len(url_list)
- chunks_dir = tempfile.mkdtemp(prefix="pdf_multi_card_", dir=output_dir)
- try:
- chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=chunk_size)
- if not chunk_paths:
- return None
- logger.info(f"PDF 共 {page_count} 页,拆成 {len(chunk_paths)} 段并行发往 {len(url_list)} 个 MinerU 实例")
- tasks = []
- for i, chunk_path in enumerate(chunk_paths):
- chunk_out = os.path.join(chunks_dir, f"out_{i}")
- os.makedirs(chunk_out, exist_ok=True)
- tasks.append(
- convert_to_markdown(
- input_file=chunk_path,
- output_dir=chunk_out,
- max_pages=max_pages,
- output_json=False,
- formula_enable=formula_enable,
- table_enable=table_enable,
- language=language,
- url=url_list[i % len(url_list)],
- embed_images=return_images,
- )
- )
- results = await asyncio.gather(*tasks, return_exceptions=True)
- parts = []
- for i, r in enumerate(results):
- if isinstance(r, Exception):
- logger.warning(f"第 {i + 1} 段 MinerU 转换异常: {r}")
- continue
- if r and r.get("content"):
- parts.append(r["content"])
- if not parts:
- return None
- merged = "\n\n".join(parts)
- filename = Path(input_file).stem + ".md"
- return {"markdown": merged, "filename": filename}
- finally:
- try:
- shutil.rmtree(chunks_dir, ignore_errors=True)
- except Exception as e:
- logger.debug(f"清理多卡临时目录失败: {e}")
- if page_count > PDF_CHUNK_PAGES:
- chunks_dir = tempfile.mkdtemp(prefix="pdf_chunks_", dir=output_dir)
- try:
- chunk_paths = split_pdf_by_pages(input_file, chunks_dir, chunk_size=PDF_CHUNK_PAGES)
- if not chunk_paths:
- return None
- logger.info(f"PDF 共 {page_count} 页,按 {PDF_CHUNK_PAGES} 页切割为 {len(chunk_paths)} 段,分段转换后合并")
- parts = []
- for i, chunk_path in enumerate(chunk_paths):
- chunk_out = os.path.join(output_dir, f"chunk_{i}")
- os.makedirs(chunk_out, exist_ok=True)
- if backend == "paddle":
- r = await _convert_with_paddle(
- input_file=chunk_path,
- output_dir=chunk_out,
- embed_images=return_images,
- output_json=False,
- forced_document_type=None,
- )
- else:
- r = await convert_to_markdown(
- input_file=chunk_path,
- output_dir=chunk_out,
- max_pages=max_pages,
- output_json=False,
- formula_enable=formula_enable,
- table_enable=table_enable,
- language=language,
- url=url,
- embed_images=return_images,
- )
- if r and r.get("content"):
- parts.append(r["content"])
- else:
- logger.warning(f"第 {i + 1} 段转换无内容,跳过")
- if not parts:
- return None
- merged = "\n\n".join(parts)
- filename = Path(input_file).stem + ".md"
- return {"markdown": merged, "filename": filename}
- finally:
- try:
- shutil.rmtree(chunks_dir, ignore_errors=True)
- except Exception as e:
- logger.debug(f"清理切割临时目录失败: {e}")
- # 单次转换(非 PDF、或 PDF 页数 <= PDF_CHUNK_PAGES)
- result = None
- if backend == "paddle":
- result = await _convert_with_paddle(
- input_file=input_file,
- output_dir=output_dir,
- embed_images=return_images,
- output_json=False,
- forced_document_type=None,
- )
- else:
- result = await convert_to_markdown(
- input_file=input_file,
- output_dir=output_dir,
- max_pages=max_pages,
- output_json=False,
- formula_enable=formula_enable,
- table_enable=table_enable,
- language=language,
- url=url,
- embed_images=return_images,
- )
- if not result or not result.get("content"):
- return None
- md_path = result.get("markdown_file") or ""
- filename = Path(md_path).name if md_path else Path(input_file).stem + ".md"
- return {"markdown": result["content"], "filename": filename}
|