|
|
@@ -24,10 +24,10 @@ from pathlib import Path
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
|
|
# API 配置(默认本机 4214 端口;可通过环境变量 PDF_CONVERTER_API_URL 覆盖)
|
|
|
-API_BASE_URL = os.getenv("PDF_CONVERTER_API_URL", "http://localhost:4214")
|
|
|
+API_BASE_URL = os.getenv("PDF_CONVERTER_API_URL", "http://47.108.80.98:4214")
|
|
|
|
|
|
# 测试文件配置
|
|
|
-TEST_DIR = Path("/root/test/test")
|
|
|
+TEST_DIR = Path(__file__).parent / "test"
|
|
|
|
|
|
# 测试用例:文件名 -> (文档类型, 是否去水印, 是否只保留表格附件)
|
|
|
# 格式:
|
|
|
@@ -47,6 +47,15 @@ TEST_CASES = {
|
|
|
"10-(决算报告)盖章页-山西晋城周村220kV输变电工程竣工决算审核报告(中瑞诚鉴字(2021)第002040号).pdf": "finalAccount",
|
|
|
}
|
|
|
|
|
|
+# pdf_to_markdown 测试用例:取 TEST_CASES 中第一个文件
|
|
|
+PDF2MD_TEST_CASES = [list(TEST_CASES.keys())[0]]
|
|
|
+
|
|
|
+# OCR 测试用例:PDF 文件路径列表(会提取每页为图片后调用 /ocr)
|
|
|
+OCR_TEST_CASES = [
|
|
|
+ "007/3、附件2:核准批复.pdf",
|
|
|
+ "007/5、附件7:检测报告.pdf",
|
|
|
+]
|
|
|
+
|
|
|
|
|
|
def print_header(title: str):
|
|
|
"""打印标题"""
|
|
|
@@ -610,23 +619,415 @@ def test_ocr(
|
|
|
return False
|
|
|
|
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# PDF 文件 OCR 测试(提取每页为图片后调用 /ocr)
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+def test_ocr_pdf(
|
|
|
+ pdf_path: str,
|
|
|
+ remove_watermark: bool = False,
|
|
|
+ light_threshold: int = 200,
|
|
|
+ saturation_threshold: int = 30,
|
|
|
+ crop_header_footer: bool = False,
|
|
|
+ header_ratio: float = 0.05,
|
|
|
+ footer_ratio: float = 0.05,
|
|
|
+ auto_detect_header_footer: bool = False,
|
|
|
+ max_pages: int = 0,
|
|
|
+) -> bool:
|
|
|
+ """
|
|
|
+ 测试 PDF 文件的 OCR:提取每页为图片后调用 /ocr 接口
|
|
|
+
|
|
|
+ Args:
|
|
|
+ pdf_path: PDF 文件路径
|
|
|
+ remove_watermark: 是否去除水印
|
|
|
+ light_threshold: 水印亮度阈值
|
|
|
+ saturation_threshold: 水印饱和度阈值
|
|
|
+ crop_header_footer: 是否裁剪页眉页脚
|
|
|
+ header_ratio: 页眉裁剪比例
|
|
|
+ footer_ratio: 页脚裁剪比例
|
|
|
+ auto_detect_header_footer: 是否自动检测页眉页脚边界
|
|
|
+ max_pages: 最大处理页数,0 表示全部
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 是否测试成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import fitz # PyMuPDF
|
|
|
+ except ImportError:
|
|
|
+ print_result(False, "PyMuPDF 未安装,无法提取 PDF 页面。请安装: pip install pymupdf")
|
|
|
+ return False
|
|
|
+
|
|
|
+ fp = Path(pdf_path)
|
|
|
+ if not fp.exists():
|
|
|
+ print_result(False, f"文件不存在: {fp}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ print(f" 📄 PDF 文件: {fp.name}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ doc = fitz.open(str(fp))
|
|
|
+ total_pages = len(doc)
|
|
|
+ print(f" 📃 总页数: {total_pages}")
|
|
|
+
|
|
|
+ pages_to_process = total_pages if max_pages == 0 else min(max_pages, total_pages)
|
|
|
+ print(f" 🔄 处理页数: {pages_to_process}")
|
|
|
+
|
|
|
+ all_texts = []
|
|
|
+ success_count = 0
|
|
|
+
|
|
|
+ for page_idx in range(pages_to_process):
|
|
|
+ page = doc[page_idx]
|
|
|
+ # 渲染页面为图片 (DPI=150)
|
|
|
+ mat = fitz.Matrix(150 / 72, 150 / 72)
|
|
|
+ pix = page.get_pixmap(matrix=mat)
|
|
|
+ img_data = pix.tobytes("png")
|
|
|
+ image_base64 = base64.b64encode(img_data).decode("utf-8")
|
|
|
+
|
|
|
+ print(f"\n 📄 页 {page_idx + 1}/{pages_to_process}")
|
|
|
+
|
|
|
+ # 构建请求参数
|
|
|
+ request_data = {
|
|
|
+ "image_base64": image_base64,
|
|
|
+ "image_format": "png"
|
|
|
+ }
|
|
|
+
|
|
|
+ if crop_header_footer:
|
|
|
+ request_data["crop_header_footer"] = True
|
|
|
+ if auto_detect_header_footer:
|
|
|
+ request_data["auto_detect_header_footer"] = True
|
|
|
+ else:
|
|
|
+ request_data["header_ratio"] = header_ratio
|
|
|
+ request_data["footer_ratio"] = footer_ratio
|
|
|
+
|
|
|
+ if remove_watermark:
|
|
|
+ request_data["remove_watermark"] = True
|
|
|
+ request_data["watermark_light_threshold"] = light_threshold
|
|
|
+ request_data["watermark_saturation_threshold"] = saturation_threshold
|
|
|
+
|
|
|
+ try:
|
|
|
+ start_time = time.time()
|
|
|
+ response = requests.post(
|
|
|
+ f"{API_BASE_URL}/ocr",
|
|
|
+ json=request_data,
|
|
|
+ timeout=120
|
|
|
+ )
|
|
|
+ elapsed = time.time() - start_time
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ result = response.json()
|
|
|
+ # 提取文本
|
|
|
+ if "data" in result and isinstance(result.get("data"), dict):
|
|
|
+ texts = result.get("data", {}).get("texts", [])
|
|
|
+ else:
|
|
|
+ texts = result.get("texts", [])
|
|
|
+
|
|
|
+ all_texts.extend(texts)
|
|
|
+ success_count += 1
|
|
|
+ print(f" ✅ OCR 成功 ({elapsed:.2f}s), 识别 {len(texts)} 个文本块")
|
|
|
+
|
|
|
+ # 显示前3个文本块
|
|
|
+ for i, text in enumerate(texts[:3]):
|
|
|
+ display = text[:40] + "..." if len(text) > 40 else text
|
|
|
+ print(f" [{i+1}] {display}")
|
|
|
+ if len(texts) > 3:
|
|
|
+ print(f" ... 还有 {len(texts) - 3} 个")
|
|
|
+ else:
|
|
|
+ print(f" ❌ OCR 失败: {response.status_code}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f" ❌ OCR 异常: {e}")
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+
|
|
|
+ # 保存结果
|
|
|
+ output_dir = Path(__file__).parent / "test_results"
|
|
|
+ output_dir.mkdir(exist_ok=True)
|
|
|
+ output_file = output_dir / f"ocr_pdf_{fp.stem}.json"
|
|
|
+
|
|
|
+ with open(output_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump({"file": str(fp), "pages": pages_to_process, "texts": all_texts}, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f"\n 💾 结果已保存: {output_file}")
|
|
|
+ print(f" 📊 汇总: {success_count}/{pages_to_process} 页成功, 共 {len(all_texts)} 个文本块")
|
|
|
+
|
|
|
+ return success_count == pages_to_process
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print_result(False, f"处理 PDF 异常: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def run_ocr_tests(
|
|
|
+ remove_watermark: bool = False,
|
|
|
+ crop_header_footer: bool = False,
|
|
|
+ max_pages: int = 0,
|
|
|
+) -> bool:
|
|
|
+ """运行 OCR_TEST_CASES 中所有 PDF 文件的 OCR 测试"""
|
|
|
+ print_header("测试 OCR 接口 (PDF 文件)")
|
|
|
+
|
|
|
+ # 检查 API
|
|
|
+ if not check_health():
|
|
|
+ print("\n❌ API 不可用")
|
|
|
+ return False
|
|
|
+
|
|
|
+ total = len(OCR_TEST_CASES)
|
|
|
+ passed = 0
|
|
|
+ failed = 0
|
|
|
+
|
|
|
+ for idx, rel_path in enumerate(OCR_TEST_CASES, 1):
|
|
|
+ fp = TEST_DIR / rel_path
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f" [{idx}/{total}] {rel_path}")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ if not fp.exists():
|
|
|
+ print_result(False, f"文件不存在: {fp}")
|
|
|
+ failed += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ if test_ocr_pdf(
|
|
|
+ str(fp),
|
|
|
+ remove_watermark=remove_watermark,
|
|
|
+ crop_header_footer=crop_header_footer,
|
|
|
+ max_pages=max_pages,
|
|
|
+ ):
|
|
|
+ passed += 1
|
|
|
+ else:
|
|
|
+ failed += 1
|
|
|
+
|
|
|
+ # 汇总
|
|
|
+ print_header("OCR 测试汇总")
|
|
|
+ print(f" 总计: {total}")
|
|
|
+ print(f" ✅ 通过: {passed}")
|
|
|
+ print(f" ❌ 失败: {failed}")
|
|
|
+ if failed == 0:
|
|
|
+ print("\n🎉 所有 OCR 测试通过!")
|
|
|
+ return failed == 0
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# /pdf_to_markdown 接口测试
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+def upload_pdf_to_markdown(
|
|
|
+ file_path: Path,
|
|
|
+ backend: Optional[str] = None,
|
|
|
+ remove_watermark: bool = False,
|
|
|
+ crop_header_footer: bool = False,
|
|
|
+ return_images: bool = False,
|
|
|
+) -> Optional[str]:
|
|
|
+ """上传文件到 /pdf_to_markdown 并返回 task_id"""
|
|
|
+ print(f"\n 📤 上传文件: {file_path.name}")
|
|
|
+ try:
|
|
|
+ with open(file_path, "rb") as f:
|
|
|
+ mime = "application/pdf" if file_path.suffix.lower() == ".pdf" else "image/*"
|
|
|
+ files = {"file": (file_path.name, f, mime)}
|
|
|
+ data: Dict[str, Any] = {}
|
|
|
+ if backend:
|
|
|
+ data["backend"] = backend
|
|
|
+ if remove_watermark:
|
|
|
+ data["remove_watermark"] = "true"
|
|
|
+ if crop_header_footer:
|
|
|
+ data["crop_header_footer"] = "true"
|
|
|
+ if return_images:
|
|
|
+ data["return_images"] = "true"
|
|
|
+
|
|
|
+ response = requests.post(
|
|
|
+ f"{API_BASE_URL}/pdf_to_markdown",
|
|
|
+ files=files,
|
|
|
+ data=data,
|
|
|
+ timeout=60,
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ result = response.json()
|
|
|
+ task_id = result.get("task_id")
|
|
|
+ print(f" 任务 ID: {task_id}")
|
|
|
+ return task_id
|
|
|
+ else:
|
|
|
+ print_result(False, f"上传失败: {response.status_code} - {response.text[:300]}")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ print_result(False, f"上传异常: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def download_markdown(task_id: str) -> Optional[str]:
|
|
|
+ """从 /task/{task_id}/json 获取 markdown 文本"""
|
|
|
+ try:
|
|
|
+ response = requests.get(f"{API_BASE_URL}/task/{task_id}/json", timeout=30)
|
|
|
+ if response.status_code == 200:
|
|
|
+ data = response.json()
|
|
|
+ return data.get("markdown", "")
|
|
|
+ else:
|
|
|
+ print_result(False, f"获取 Markdown 失败: {response.status_code}")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ print_result(False, f"获取 Markdown 异常: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def download_markdown_file(task_id: str, save_path: Path) -> bool:
|
|
|
+ """从 /download/{task_id}/markdown 下载 .md 文件"""
|
|
|
+ try:
|
|
|
+ response = requests.get(f"{API_BASE_URL}/download/{task_id}/markdown", timeout=30)
|
|
|
+ if response.status_code == 200:
|
|
|
+ save_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+ save_path.write_bytes(response.content)
|
|
|
+ print(f" 💾 Markdown 文件已保存: {save_path}")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print_result(False, f"下载 Markdown 文件失败: {response.status_code}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print_result(False, f"下载 Markdown 文件异常: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def test_pdf_to_markdown(
|
|
|
+ file_path: Optional[str] = None,
|
|
|
+ backend: Optional[str] = None,
|
|
|
+ remove_watermark: bool = False,
|
|
|
+ crop_header_footer: bool = False,
|
|
|
+ return_images: bool = False,
|
|
|
+ max_wait: int = 600,
|
|
|
+) -> bool:
|
|
|
+ """测试 /pdf_to_markdown 接口
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: 要测试的文件路径,默认使用 TEST_DIR 下第一个 PDF
|
|
|
+ backend: MinerU backend,留空使用服务端默认
|
|
|
+ remove_watermark: 是否去水印
|
|
|
+ crop_header_footer: 是否裁剪页眉页脚
|
|
|
+ return_images: 是否返回图片
|
|
|
+ max_wait: 最大等待秒数
|
|
|
+ """
|
|
|
+ print_header("测试 /pdf_to_markdown 接口")
|
|
|
+
|
|
|
+ # 检查 API
|
|
|
+ if not check_health():
|
|
|
+ print("\n❌ API 不可用")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 确定测试文件列表
|
|
|
+ if file_path:
|
|
|
+ files_to_test = [Path(file_path)]
|
|
|
+ else:
|
|
|
+ # 遍历 PDF2MD_TEST_CASES 中所有文件
|
|
|
+ files_to_test = []
|
|
|
+ for fname in PDF2MD_TEST_CASES:
|
|
|
+ fp = TEST_DIR / fname
|
|
|
+ if fp.exists():
|
|
|
+ files_to_test.append(fp)
|
|
|
+ else:
|
|
|
+ print(f" ⚠️ 跳过不存在的文件: {fname}")
|
|
|
+ if not files_to_test:
|
|
|
+ print_result(False, f"TEST_DIR ({TEST_DIR}) 中没有可用的测试文件")
|
|
|
+ return False
|
|
|
+
|
|
|
+ total = len(files_to_test)
|
|
|
+ passed = 0
|
|
|
+ failed = 0
|
|
|
+
|
|
|
+ for idx, fp in enumerate(files_to_test, 1):
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f" [{idx}/{total}] {fp.name}")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ if not fp.exists():
|
|
|
+ print_result(False, f"文件不存在: {fp}")
|
|
|
+ failed += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ print(f" 📄 文件: {fp.name} ({fp.stat().st_size / 1024:.1f} KB)")
|
|
|
+ if backend:
|
|
|
+ print(f" 🔧 Backend: {backend}")
|
|
|
+ if remove_watermark:
|
|
|
+ print(f" 🔧 去水印: 是")
|
|
|
+ if crop_header_footer:
|
|
|
+ print(f" 🔧 裁剪页眉页脚: 是")
|
|
|
+ if return_images:
|
|
|
+ print(f" 🔧 返回图片: 是")
|
|
|
+
|
|
|
+ # 1. 上传
|
|
|
+ task_id = upload_pdf_to_markdown(fp, backend, remove_watermark, crop_header_footer, return_images)
|
|
|
+ if not task_id:
|
|
|
+ failed += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 2. 轮询
|
|
|
+ task_result = poll_task_status(task_id, max_wait=max_wait)
|
|
|
+ if not task_result:
|
|
|
+ failed += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 3. 获取 Markdown 文本
|
|
|
+ md_text = download_markdown(task_id)
|
|
|
+
|
|
|
+ # 4. 下载 .md 文件
|
|
|
+ output_dir = Path(__file__).parent / "test_results"
|
|
|
+ output_dir.mkdir(exist_ok=True)
|
|
|
+ md_file = output_dir / f"pdf2md_{fp.stem}.md"
|
|
|
+ download_markdown_file(task_id, md_file)
|
|
|
+
|
|
|
+ # 5. 下载 ZIP(如果 return_images)
|
|
|
+ if return_images:
|
|
|
+ try:
|
|
|
+ zip_resp = requests.get(f"{API_BASE_URL}/download/{task_id}/zip", timeout=60)
|
|
|
+ if zip_resp.status_code == 200:
|
|
|
+ zip_file = output_dir / f"pdf2md_{fp.stem}.zip"
|
|
|
+ zip_file.write_bytes(zip_resp.content)
|
|
|
+ print(f" 💾 ZIP 文件已保存: {zip_file} ({len(zip_resp.content) / 1024:.1f} KB)")
|
|
|
+ else:
|
|
|
+ print_result(False, f"下载 ZIP 失败: {zip_resp.status_code}")
|
|
|
+ except Exception as e:
|
|
|
+ print_result(False, f"下载 ZIP 异常: {e}")
|
|
|
+
|
|
|
+ # 6. 输出摘要
|
|
|
+ if md_text:
|
|
|
+ lines = md_text.strip().split("\n")
|
|
|
+ print(f"\n 📝 Markdown 结果: {len(md_text)} 字符, {len(lines)} 行")
|
|
|
+ print(f" --- 前 10 行 ---")
|
|
|
+ for line in lines[:10]:
|
|
|
+ display = line[:80] + "..." if len(line) > 80 else line
|
|
|
+ print(f" {display}")
|
|
|
+ if len(lines) > 10:
|
|
|
+ print(f" ... 还有 {len(lines) - 10} 行")
|
|
|
+ print_result(True, "PDF 转 Markdown 成功")
|
|
|
+ passed += 1
|
|
|
+ else:
|
|
|
+ print_result(False, "未获取到 Markdown 内容")
|
|
|
+ failed += 1
|
|
|
+
|
|
|
+ # 打印汇总
|
|
|
+ print_header("pdf_to_markdown 测试汇总")
|
|
|
+ print(f" 总计: {total}")
|
|
|
+ print(f" ✅ 通过: {passed}")
|
|
|
+ print(f" ❌ 失败: {failed}")
|
|
|
+ if failed == 0:
|
|
|
+ print("\n🎉 所有 pdf_to_markdown 测试通过!")
|
|
|
+ return failed == 0
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
if len(sys.argv) > 1:
|
|
|
# 测试指定类型
|
|
|
doc_type = sys.argv[1]
|
|
|
if doc_type in ["--help", "-h"]:
|
|
|
print("用法:")
|
|
|
- print(" python test_api.py # 运行所有测试")
|
|
|
- print(" python test_api.py <type> # 测试指定类型")
|
|
|
- print(" python test_api.py ocr # 测试 OCR 接口")
|
|
|
- print(" python test_api.py ocr <image_path> # 测试 OCR(指定图片或txt)")
|
|
|
- print(" python test_api.py ocr <image_path> --nowm # 测试 OCR 并去水印")
|
|
|
- print(" python test_api.py ocr <image_path> --crop # 测试 OCR 并裁剪页眉页脚")
|
|
|
- print(" python test_api.py ocr <image_path> --nowm --crop # 同时去水印和裁剪")
|
|
|
+ print(" python test_api.py # 运行所有 /convert 测试")
|
|
|
+ print(" python test_api.py <type> # 测试指定文档类型")
|
|
|
+ print(" python test_api.py ocr # 测试 OCR 接口(图片)")
|
|
|
+ print(" python test_api.py ocr <path> [--nowm] [--crop]")
|
|
|
+ print(" python test_api.py ocrpdf # 测试 OCR 接口(PDF 文件,遍历 OCR_TEST_CASES)")
|
|
|
+ print(" python test_api.py ocrpdf <path> [--nowm] [--crop] [--pages=N]")
|
|
|
+ print(" python test_api.py pdf2md # 测试 /pdf_to_markdown(默认文件)")
|
|
|
+ print(" python test_api.py pdf2md <path> [--backend=X] [--nowm] [--crop] [--images]")
|
|
|
print("\n可用类型:")
|
|
|
- for dtype in set(TEST_CASES.values()):
|
|
|
+ for dtype in set(v if isinstance(v, str) else v[0] for v in TEST_CASES.values()):
|
|
|
print(f" - {dtype}")
|
|
|
- print(" - ocr (OCR 图片识别)")
|
|
|
+ print(" - ocr (OCR 图片识别)")
|
|
|
+ print(" - ocrpdf (OCR PDF 文件,提取每页调用 /ocr)")
|
|
|
+ print(" - pdf2md (PDF/图片转 Markdown)")
|
|
|
print("\nOCR 去水印参数:")
|
|
|
print(" --nowm 启用去水印")
|
|
|
print(" --light=N 亮度阈值(0-255,默认200)")
|
|
|
@@ -636,6 +1037,16 @@ if __name__ == "__main__":
|
|
|
print(" --crop-auto 启用裁剪页眉页脚(自动检测模式)")
|
|
|
print(" --header=N 页眉裁剪比例(0-1,默认0.05表示5%)")
|
|
|
print(" --footer=N 页脚裁剪比例(0-1,默认0.05表示5%)")
|
|
|
+ print("\npdf2md 参数:")
|
|
|
+ print(" --backend=X 指定 MinerU backend")
|
|
|
+ print(" --nowm 启用去水印")
|
|
|
+ print(" --crop 启用裁剪页眉页脚")
|
|
|
+ print(" --images 返回图片(可下载 ZIP)")
|
|
|
+ print(" --wait=N 最大等待秒数(默认600)")
|
|
|
+ print("\nocrpdf 参数:")
|
|
|
+ print(" --nowm 启用去水印")
|
|
|
+ print(" --crop 启用裁剪页眉页脚")
|
|
|
+ print(" --pages=N 最大处理页数(0=全部)")
|
|
|
elif doc_type == "ocr":
|
|
|
# 解析 OCR 参数
|
|
|
image_path = None
|
|
|
@@ -688,6 +1099,79 @@ if __name__ == "__main__":
|
|
|
footer_ratio,
|
|
|
auto_detect_header_footer
|
|
|
)
|
|
|
+ elif doc_type == "ocrpdf":
|
|
|
+ # 解析 ocrpdf 参数
|
|
|
+ ocrpdf_file = None
|
|
|
+ ocrpdf_nowm = False
|
|
|
+ ocrpdf_crop = False
|
|
|
+ ocrpdf_pages = 0
|
|
|
+
|
|
|
+ for arg in sys.argv[2:]:
|
|
|
+ if arg == "--nowm":
|
|
|
+ ocrpdf_nowm = True
|
|
|
+ elif arg == "--crop":
|
|
|
+ ocrpdf_crop = True
|
|
|
+ elif arg.startswith("--pages="):
|
|
|
+ try:
|
|
|
+ ocrpdf_pages = int(arg.split("=", 1)[1])
|
|
|
+ except ValueError:
|
|
|
+ print(f"警告: 无效的页数 {arg},使用默认值 0(全部)")
|
|
|
+ elif not arg.startswith("--"):
|
|
|
+ ocrpdf_file = arg
|
|
|
+
|
|
|
+ if ocrpdf_file:
|
|
|
+ # 测试单个 PDF 文件
|
|
|
+ print_header("测试 OCR 接口 (PDF 文件)")
|
|
|
+ if not check_health():
|
|
|
+ print("\n❌ API 不可用")
|
|
|
+ else:
|
|
|
+ test_ocr_pdf(
|
|
|
+ ocrpdf_file,
|
|
|
+ remove_watermark=ocrpdf_nowm,
|
|
|
+ crop_header_footer=ocrpdf_crop,
|
|
|
+ max_pages=ocrpdf_pages,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 遍历 OCR_TEST_CASES
|
|
|
+ run_ocr_tests(
|
|
|
+ remove_watermark=ocrpdf_nowm,
|
|
|
+ crop_header_footer=ocrpdf_crop,
|
|
|
+ max_pages=ocrpdf_pages,
|
|
|
+ )
|
|
|
+ elif doc_type == "pdf2md":
|
|
|
+ # 解析 pdf2md 参数
|
|
|
+ pdf2md_file = None
|
|
|
+ pdf2md_backend = None
|
|
|
+ pdf2md_nowm = False
|
|
|
+ pdf2md_crop = False
|
|
|
+ pdf2md_images = False
|
|
|
+ pdf2md_wait = 600
|
|
|
+
|
|
|
+ for arg in sys.argv[2:]:
|
|
|
+ if arg == "--nowm":
|
|
|
+ pdf2md_nowm = True
|
|
|
+ elif arg == "--crop":
|
|
|
+ pdf2md_crop = True
|
|
|
+ elif arg == "--images":
|
|
|
+ pdf2md_images = True
|
|
|
+ elif arg.startswith("--backend="):
|
|
|
+ pdf2md_backend = arg.split("=", 1)[1]
|
|
|
+ elif arg.startswith("--wait="):
|
|
|
+ try:
|
|
|
+ pdf2md_wait = int(arg.split("=", 1)[1])
|
|
|
+ except ValueError:
|
|
|
+ print(f"警告: 无效的等待时间 {arg},使用默认值 600")
|
|
|
+ elif not arg.startswith("--"):
|
|
|
+ pdf2md_file = arg
|
|
|
+
|
|
|
+ test_pdf_to_markdown(
|
|
|
+ file_path=pdf2md_file,
|
|
|
+ backend=pdf2md_backend,
|
|
|
+ remove_watermark=pdf2md_nowm,
|
|
|
+ crop_header_footer=pdf2md_crop,
|
|
|
+ return_images=pdf2md_images,
|
|
|
+ max_wait=pdf2md_wait,
|
|
|
+ )
|
|
|
else:
|
|
|
test_single(doc_type)
|
|
|
else:
|