| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- PDF Converter API 测试脚本
- 测试新增的投资类型:
- - fsApproval: 可研批复
- - fsReview: 可研评审
- - pdApproval: 初设批复
- - safetyFsApproval: 安评可研批复
- 以及现有类型:
- - settlementReport: 结算报告
- - designReview: 初设评审
- """
- import os
- import sys
- import json
- import time
- import base64
- import requests
- from pathlib import Path
- from typing import Optional, Dict, Any, List
- # API 配置(默认本机 4214 端口;可通过环境变量 PDF_CONVERTER_API_URL 覆盖)
- API_BASE_URL = os.getenv("PDF_CONVERTER_API_URL", "http://47.108.80.98:4214")
- # 测试文件配置
- TEST_DIR = Path(__file__).parent / "test"
- # 测试用例:文件名 -> (文档类型, 是否去水印, 是否只保留表格附件)
- # 格式:
- # "文件名": ("类型", 去水印, 只保留表格) - 完整格式
- # "文件名": ("类型", 去水印) - 兼容格式,只保留表格默认True
- # "文件名": "类型" - 旧格式,去水印False,只保留表格True
- TEST_CASES = {
- # 新增投资类型
- "鄂电司发展〔2024〕124号 国网湖北省电力有限公司关于襄阳连云220千伏输变电工程可行性研究报告的批复.pdf": ("safetyFsApproval", True,False), # 需要去水印 + 只保留表格附件
- "2-(可研批复)晋电发展〔2017〕831号+国网山西省电力公司关于临汾古县、晋城周村220kV输变电等工程可行性研究报告的批复.pdf.pdf": "fsApproval",
- "1-(可研评审)晋电经研规划〔2017〕187号(盖章)国网山西经研院关于山西晋城周村220kV输变电工程可行性研究报告的评审意见.pdf": "fsReview",
- "5-(初设批复)晋电建设〔2019〕566号 国网山西省电力公司关于晋城周村220kV输变电工程初步设计的批复 .pdf": "pdApproval",
- # 现有类型
- "9-(结算报告)山西晋城周村220kV输变电工程结算审计报告.pdf": "settlementReport",
- "4-(初设评审)中电联电力建设技术经济咨询中心技经〔2019〕201号关于山西周村220kV输变电工程初步设计的评审意见.pdf": "designReview",
- # 决算报告
- "10-(决算报告)盖章页-山西晋城周村220kV输变电工程竣工决算审核报告(中瑞诚鉴字(2021)第002040号).pdf": "finalAccount",
- }
- # pdf_to_markdown 测试用例:取 TEST_CASES 中第一个文件
- PDF2MD_TEST_CASES = [list(TEST_CASES.keys())[0]]
- # OCR 测试用例:PDF 文件路径列表(会提取每页为图片后调用 /ocr)
- OCR_TEST_CASES = [
- "007/3、附件2:核准批复.pdf",
- "007/5、附件7:检测报告.pdf",
- ]
- def print_header(title: str):
- """打印标题"""
- print("\n" + "=" * 60)
- print(f" {title}")
- print("=" * 60)
- def print_result(success: bool, message: str):
- """打印结果"""
- status = "✅ 成功" if success else "❌ 失败"
- print(f" {status}: {message}")
- def check_health() -> bool:
- """检查 API 健康状态"""
- print_header("检查 API 健康状态")
- try:
- response = requests.get(f"{API_BASE_URL}/health", timeout=10)
- if response.status_code == 200:
- print_result(True, f"API 正常运行 - {response.json()}")
- return True
- else:
- print_result(False, f"状态码: {response.status_code}")
- return False
- except requests.exceptions.RequestException as e:
- print_result(False, f"连接失败: {e}")
- return False
- def upload_file(file_path: Path, document_type: str, remove_watermark: bool = False, table_only: bool = True) -> Optional[str]:
- """上传文件并获取任务 ID
-
- Args:
- file_path: 文件路径
- document_type: 文档类型
- remove_watermark: 是否去水印
- table_only: 是否只保留表格附件
- """
- print(f"\n 📤 上传文件: {file_path.name}")
- print(f" 类型: {document_type}")
- if remove_watermark:
- print(f" 去水印: 是")
- if table_only:
- print(f" 只保留表格: 是")
-
- try:
- with open(file_path, "rb") as f:
- files = {"file": (file_path.name, f, "application/pdf")}
- # 使用 data 发送表单参数,参数名是 type(不是 document_type)
- data = {"type": document_type}
-
- # 添加去水印参数
- if remove_watermark:
- data["remove_watermark"] = "true"
- data["watermark_light_threshold"] = "200"
- data["watermark_saturation_threshold"] = "30"
-
- # 添加只保留表格参数
- data["table_only"] = "true" if table_only else "false"
-
- response = requests.post(
- f"{API_BASE_URL}/convert",
- files=files,
- data=data,
- timeout=60
- )
-
- if response.status_code == 200:
- result = response.json()
- task_id = result.get("task_id")
- print(f" 任务 ID: {task_id}")
- return task_id
- else:
- print_result(False, f"上传失败: {response.status_code} - {response.text}")
- return None
- except Exception as e:
- print_result(False, f"上传异常: {e}")
- return None
- def poll_task_status(task_id: str, max_wait: int = 300) -> Optional[Dict[str, Any]]:
- """轮询任务状态"""
- print(f" ⏳ 等待任务完成...")
-
- start_time = time.time()
- poll_interval = 5 # 轮询间隔(秒)
-
- while time.time() - start_time < max_wait:
- try:
- response = requests.get(f"{API_BASE_URL}/task/{task_id}", timeout=10)
-
- if response.status_code == 200:
- result = response.json()
- status = result.get("status")
-
- if status == "completed":
- elapsed = time.time() - start_time
- print(f" 完成! 耗时: {elapsed:.1f}s")
- return result
- elif status == "failed":
- error = result.get("error", "未知错误")
- print_result(False, f"任务失败: {error}")
- return None
- else:
- # 仍在处理中
- elapsed = time.time() - start_time
- print(f" 处理中... ({elapsed:.0f}s)", end="\r")
- else:
- print_result(False, f"查询状态失败: {response.status_code}")
- return None
-
- except Exception as e:
- print_result(False, f"查询异常: {e}")
- return None
-
- time.sleep(poll_interval)
-
- print_result(False, f"超时: 超过 {max_wait} 秒")
- return None
- def get_json_result(task_id: str) -> Optional[Dict[str, Any]]:
- """获取 JSON 结果"""
- try:
- response = requests.get(f"{API_BASE_URL}/task/{task_id}/json", timeout=30)
-
- if response.status_code == 200:
- return response.json()
- else:
- print_result(False, f"获取 JSON 失败: {response.status_code}")
- return None
- except Exception as e:
- print_result(False, f"获取 JSON 异常: {e}")
- return None
- def validate_result(result: Dict[str, Any], expected_type: str) -> bool:
- """验证结果"""
- document_type = result.get("document_type")
- data = result.get("data")
-
- # 检查文档类型
- if document_type != expected_type:
- print_result(False, f"文档类型不匹配: 期望 {expected_type}, 实际 {document_type}")
- return False
-
- # 检查数据是否为空
- if not data:
- print_result(False, "数据为空")
- return False
-
- # 对于投资类型,检查嵌套结构
- if expected_type in ["fsApproval", "fsReview", "pdApproval", "safetyFsApproval"]:
- # 检查是否是新格式(包含 projectInfo)
- project_info = None
- if isinstance(data, dict) and "data" in data:
- # 新格式:{"projectInfo": {...}, "data": [...]}
- project_info = data.get("projectInfo")
- data = data["data"]
-
- if project_info:
- print(f"\n 📋 项目信息:")
- print(f" 工程名称: {project_info.get('projectName', '')}")
- print(f" 项目单位: {project_info.get('projectUnit', '')}")
- print(f" 设计单位: {project_info.get('designUnit', '')}")
-
- # 验证数据格式
- if not isinstance(data, list):
- print_result(False, f"数据格式错误: 期望 list, 实际 {type(data).__name__}")
- return False
-
- if len(data) == 0:
- print_result(False, "投资数据列表为空")
- return False
-
- # 检查第一项的结构
- first_item = data[0]
- required_fields = ["name", "Level", "staticInvestment", "dynamicInvestment", "items"]
- missing_fields = [f for f in required_fields if f not in first_item]
-
- if missing_fields:
- print_result(False, f"缺少字段: {missing_fields}")
- return False
-
- print_result(True, f"解析到 {len(data)} 个大类")
-
- # 打印摘要
- for item in data:
- name = item.get("name", "")
- static = item.get("staticInvestment", 0)
- dynamic = item.get("dynamicInvestment", 0)
- sub_items = len(item.get("items", []))
- print(f" - {name}: 静态={static}, 动态={dynamic}, 子项={sub_items}")
-
- # 对于结算报告
- elif expected_type == "settlementReport":
- if isinstance(data, list):
- print_result(True, f"解析到 {len(data)} 条记录")
- else:
- print_result(True, f"解析完成")
-
- # 对于初设评审
- elif expected_type == "designReview":
- if isinstance(data, list):
- print_result(True, f"解析到 {len(data)} 条记录")
- else:
- print_result(True, f"解析完成")
-
- return True
- def test_single_file(file_path: Path, document_type: str, remove_watermark: bool = False, table_only: bool = True) -> bool:
- """测试单个文件
-
- Args:
- file_path: 文件路径
- document_type: 文档类型
- remove_watermark: 是否去水印
- table_only: 是否只保留表格附件
- """
- print_header(f"测试: {document_type}")
- print(f" 文件: {file_path.name}")
- if remove_watermark:
- print(f" 去水印: 是")
- if table_only:
- print(f" 只保留表格: 是")
-
- # 1. 上传文件
- task_id = upload_file(file_path, document_type, remove_watermark, table_only)
- if not task_id:
- return False
-
- # 2. 等待任务完成
- task_result = poll_task_status(task_id)
- if not task_result:
- return False
-
- # 3. 获取 JSON 结果
- json_result = get_json_result(task_id)
- if not json_result:
- return False
-
- # 4. 验证结果
- is_valid = validate_result(json_result, document_type)
-
- # 5. 保存结果到文件
- output_dir = Path(__file__).parent / "test_results"
- output_dir.mkdir(exist_ok=True)
- output_file = output_dir / f"{document_type}_result.json"
-
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(json_result, f, ensure_ascii=False, indent=2)
- print(f" 💾 结果已保存: {output_file}")
-
- return is_valid
- def run_all_tests():
- """运行所有测试"""
- print_header("PDF Converter API 测试")
- print(f" API 地址: {API_BASE_URL}")
- print(f" 测试目录: {TEST_DIR}")
-
- # 检查测试目录
- if not TEST_DIR.exists():
- print_result(False, f"测试目录不存在: {TEST_DIR}")
- return
-
- # 检查 API 健康状态
- if not check_health():
- print("\n❌ API 不可用,终止测试")
- return
-
- # 统计结果
- total = 0
- passed = 0
- failed = 0
- skipped = 0
-
- # 运行每个测试用例
- for filename, config in TEST_CASES.items():
- # 解析配置格式
- if isinstance(config, tuple):
- if len(config) >= 3:
- document_type, remove_watermark, table_only = config[:3]
- elif len(config) == 2:
- document_type, remove_watermark = config
- table_only = True # 默认只保留表格
- else:
- document_type = config[0]
- remove_watermark = False
- table_only = True
- else:
- document_type = config
- remove_watermark = False
- table_only = True
-
- file_path = TEST_DIR / filename
-
- if not file_path.exists():
- print_header(f"跳过: {document_type}")
- print_result(False, f"文件不存在: {filename}")
- skipped += 1
- continue
-
- total += 1
-
- try:
- if test_single_file(file_path, document_type, remove_watermark, table_only):
- passed += 1
- else:
- failed += 1
- except Exception as e:
- print_result(False, f"测试异常: {e}")
- failed += 1
-
- # 打印总结
- print_header("测试总结")
- print(f" 总计: {total}")
- print(f" ✅ 通过: {passed}")
- print(f" ❌ 失败: {failed}")
- print(f" ⏭️ 跳过: {skipped}")
-
- if failed == 0 and skipped == 0:
- print("\n🎉 所有测试通过!")
- elif failed > 0:
- print(f"\n⚠️ 有 {failed} 个测试失败")
- def test_single(document_type: str):
- """测试单个类型"""
- print_header(f"单项测试: {document_type}")
-
- # 检查 API
- if not check_health():
- print("\n❌ API 不可用")
- return
-
- # 查找对应的文件
- for filename, config in TEST_CASES.items():
- # 解析配置格式
- if isinstance(config, tuple):
- if len(config) >= 3:
- dtype, remove_watermark, table_only = config[:3]
- elif len(config) == 2:
- dtype, remove_watermark = config
- table_only = True
- else:
- dtype = config[0]
- remove_watermark = False
- table_only = True
- else:
- dtype = config
- remove_watermark = False
- table_only = True
-
- if dtype == document_type:
- file_path = TEST_DIR / filename
- if file_path.exists():
- test_single_file(file_path, document_type, remove_watermark, table_only)
- return
- else:
- print_result(False, f"文件不存在: {filename}")
- return
-
- print_result(False, f"未找到类型 {document_type} 的测试文件")
- def test_ocr(
- image_path: Optional[str] = None,
- remove_watermark: bool = False,
- light_threshold: int = 200,
- saturation_threshold: int = 30,
- crop_header_footer: bool = False,
- header_ratio: float = 0.05,
- footer_ratio: float = 0.05,
- auto_detect_header_footer: bool = False
- ) -> bool:
- """
- 测试 OCR 接口
-
- Args:
- image_path: 图片路径或包含base64数据的txt文件路径,默认使用 test/image.png
- 支持格式:
- - 图片文件:.png, .jpg, .jpeg
- - txt文件:包含base64编码的图片数据(可带data:image/xxx;base64,前缀)
- remove_watermark: 是否去除水印
- light_threshold: 水印亮度阈值(0-255),默认200
- saturation_threshold: 水印饱和度阈值(0-255),默认30
- crop_header_footer: 是否裁剪页眉页脚
- header_ratio: 页眉裁剪比例(0-1),默认0.05
- footer_ratio: 页脚裁剪比例(0-1),默认0.05
- auto_detect_header_footer: 是否自动检测页眉页脚边界
-
- Returns:
- 是否测试成功
- """
- print_header("测试 OCR 接口")
-
- # 检查 API
- if not check_health():
- print("\n❌ API 不可用")
- return False
-
- # 确定图片路径
- if image_path is None:
- image_path = TEST_DIR / "image.png"
- else:
- image_path = Path(image_path)
-
- print(f" 📷 文件路径: {image_path}")
-
- if not image_path.exists():
- print_result(False, f"文件不存在: {image_path}")
- return False
-
- suffix = image_path.suffix.lower()
-
- # 判断是 txt 文件还是图片文件
- if suffix == ".txt":
- # 从 txt 文件读取 base64 数据
- print(f" 📄 文件类型: txt (base64 数据)")
- try:
- with open(image_path, "r", encoding="utf-8") as f:
- image_base64 = f.read().strip()
-
- # 解析 data URI,提取格式和 base64 数据
- if image_base64.startswith("data:"):
- # 格式: data:image/png;base64,xxxxx
- if "," in image_base64:
- header, image_base64 = image_base64.split(",", 1)
- # 从 header 中提取图片格式
- if "image/png" in header:
- image_format = "png"
- elif "image/jpeg" in header or "image/jpg" in header:
- image_format = "jpeg"
- else:
- image_format = "png" # 默认
- print(f" 🖼️ 图片格式 (从data URI解析): {image_format}")
- else:
- image_format = "png"
- print(f" 🖼️ 图片格式 (默认): {image_format}")
- else:
- image_format = "png"
- print(f" 🖼️ 图片格式 (默认): {image_format}")
-
- print(f" 🔤 Base64长度: {len(image_base64)} 字符")
-
- except Exception as e:
- print_result(False, f"读取txt文件失败: {e}")
- return False
- else:
- # 读取图片文件并转为 base64
- print(f" 📄 文件类型: 图片文件")
- try:
- with open(image_path, "rb") as f:
- image_data = f.read()
- image_base64 = base64.b64encode(image_data).decode("utf-8")
- print(f" 📦 图片大小: {len(image_data)} bytes")
- print(f" 🔤 Base64长度: {len(image_base64)} 字符")
- except Exception as e:
- print_result(False, f"读取图片失败: {e}")
- return False
-
- # 确定图片格式
- format_map = {".png": "png", ".jpg": "jpeg", ".jpeg": "jpeg"}
- image_format = format_map.get(suffix, "png")
- print(f" 🖼️ 图片格式: {image_format}")
-
- # 调用 OCR 接口
- print(f"\n 📤 调用 OCR 接口...")
- # 构建请求参数
- request_data = {
- "image_base64": image_base64,
- "image_format": image_format
- }
-
- if crop_header_footer:
- request_data["crop_header_footer"] = True
- if auto_detect_header_footer:
- request_data["auto_detect_header_footer"] = True
- print(f" ✂️ 裁剪页眉页脚: 自动检测模式")
- else:
- request_data["header_ratio"] = header_ratio
- request_data["footer_ratio"] = footer_ratio
- print(f" ✂️ 裁剪页眉页脚: 是 (顶部={header_ratio*100:.0f}%, 底部={footer_ratio*100:.0f}%)")
-
- if remove_watermark:
- request_data["remove_watermark"] = True
- request_data["watermark_light_threshold"] = light_threshold
- request_data["watermark_saturation_threshold"] = saturation_threshold
- print(f" 🔧 去水印: 是 (亮度阈值={light_threshold}, 饱和度阈值={saturation_threshold})")
-
- try:
- start_time = time.time()
- response = requests.post(
- f"{API_BASE_URL}/ocr",
- json=request_data,
- timeout=120
- )
- elapsed = time.time() - start_time
-
- if response.status_code == 200:
- result = response.json()
- print_result(True, f"OCR 识别成功 (耗时: {elapsed:.2f}s)")
-
- # 显示识别结果(支持两种返回格式)
- # 格式1: {"texts": [...], "gpu_info": {...}}
- # 格式2: {"code": 0, "data": {"texts": [...]}, "gpu_info": {...}}
- if "data" in result and isinstance(result.get("data"), dict):
- texts: List[str] = result.get("data", {}).get("texts", [])
- else:
- texts: List[str] = result.get("texts", [])
- gpu_info = result.get("gpu_info", {})
-
- print(f"\n 📝 识别结果 ({len(texts)} 个文本块):")
- for i, text in enumerate(texts[:10]): # 最多显示前10个
- # 截断长文本
- display_text = text[:50] + "..." if len(text) > 50 else text
- print(f" [{i+1}] {display_text}")
-
- if len(texts) > 10:
- print(f" ... 还有 {len(texts) - 10} 个文本块")
-
- # 显示 GPU 信息
- if gpu_info:
- print(f"\n 💻 GPU 监控信息:")
- gpu_util = gpu_info.get('gpu_utilization', gpu_info.get('gpu_util_avg', 'N/A'))
- if isinstance(gpu_util, float):
- gpu_util = f"{gpu_util:.1f}"
- print(f" GPU利用率: {gpu_util}%")
-
- mem_used = gpu_info.get('gpu_memory_used_max', gpu_info.get('memory_used_max', 'N/A'))
- if isinstance(mem_used, (int, float)):
- mem_used = f"{mem_used / (1024**2):.0f}" # 转为 MB
- print(f" 显存使用峰值: {mem_used} MB")
-
- gpu_name = gpu_info.get('gpu_name', 'N/A')
- print(f" GPU型号: {gpu_name}")
-
- # 保存完整结果
- output_dir = Path(__file__).parent / "test_results"
- output_dir.mkdir(exist_ok=True)
- output_file = output_dir / "ocr_result.json"
-
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- print(f"\n 💾 结果已保存: {output_file}")
-
- return True
- else:
- print_result(False, f"OCR 失败: {response.status_code} - {response.text}")
- return False
-
- except requests.exceptions.Timeout:
- print_result(False, "OCR 请求超时")
- return False
- except Exception as e:
- print_result(False, f"OCR 异常: {e}")
- return False
- # ---------------------------------------------------------------------------
- # PDF 文件 OCR 测试(提取每页为图片后调用 /ocr)
- # ---------------------------------------------------------------------------
- def test_ocr_pdf(
- pdf_path: str,
- remove_watermark: bool = False,
- light_threshold: int = 200,
- saturation_threshold: int = 30,
- crop_header_footer: bool = False,
- header_ratio: float = 0.05,
- footer_ratio: float = 0.05,
- auto_detect_header_footer: bool = False,
- max_pages: int = 0,
- ) -> bool:
- """
- 测试 PDF 文件的 OCR:提取每页为图片后调用 /ocr 接口
-
- Args:
- pdf_path: PDF 文件路径
- remove_watermark: 是否去除水印
- light_threshold: 水印亮度阈值
- saturation_threshold: 水印饱和度阈值
- crop_header_footer: 是否裁剪页眉页脚
- header_ratio: 页眉裁剪比例
- footer_ratio: 页脚裁剪比例
- auto_detect_header_footer: 是否自动检测页眉页脚边界
- max_pages: 最大处理页数,0 表示全部
-
- Returns:
- 是否测试成功
- """
- try:
- import fitz # PyMuPDF
- except ImportError:
- print_result(False, "PyMuPDF 未安装,无法提取 PDF 页面。请安装: pip install pymupdf")
- return False
-
- fp = Path(pdf_path)
- if not fp.exists():
- print_result(False, f"文件不存在: {fp}")
- return False
-
- print(f" 📄 PDF 文件: {fp.name}")
-
- try:
- doc = fitz.open(str(fp))
- total_pages = len(doc)
- print(f" 📃 总页数: {total_pages}")
-
- pages_to_process = total_pages if max_pages == 0 else min(max_pages, total_pages)
- print(f" 🔄 处理页数: {pages_to_process}")
-
- all_texts = []
- success_count = 0
-
- for page_idx in range(pages_to_process):
- page = doc[page_idx]
- # 渲染页面为图片 (DPI=150)
- mat = fitz.Matrix(150 / 72, 150 / 72)
- pix = page.get_pixmap(matrix=mat)
- img_data = pix.tobytes("png")
- image_base64 = base64.b64encode(img_data).decode("utf-8")
-
- print(f"\n 📄 页 {page_idx + 1}/{pages_to_process}")
-
- # 构建请求参数
- request_data = {
- "image_base64": image_base64,
- "image_format": "png"
- }
-
- if crop_header_footer:
- request_data["crop_header_footer"] = True
- if auto_detect_header_footer:
- request_data["auto_detect_header_footer"] = True
- else:
- request_data["header_ratio"] = header_ratio
- request_data["footer_ratio"] = footer_ratio
-
- if remove_watermark:
- request_data["remove_watermark"] = True
- request_data["watermark_light_threshold"] = light_threshold
- request_data["watermark_saturation_threshold"] = saturation_threshold
-
- try:
- start_time = time.time()
- response = requests.post(
- f"{API_BASE_URL}/ocr",
- json=request_data,
- timeout=120
- )
- elapsed = time.time() - start_time
-
- if response.status_code == 200:
- result = response.json()
- # 提取文本
- if "data" in result and isinstance(result.get("data"), dict):
- texts = result.get("data", {}).get("texts", [])
- else:
- texts = result.get("texts", [])
-
- all_texts.extend(texts)
- success_count += 1
- print(f" ✅ OCR 成功 ({elapsed:.2f}s), 识别 {len(texts)} 个文本块")
-
- # 显示前3个文本块
- for i, text in enumerate(texts[:3]):
- display = text[:40] + "..." if len(text) > 40 else text
- print(f" [{i+1}] {display}")
- if len(texts) > 3:
- print(f" ... 还有 {len(texts) - 3} 个")
- else:
- print(f" ❌ OCR 失败: {response.status_code}")
- except Exception as e:
- print(f" ❌ OCR 异常: {e}")
-
- doc.close()
-
- # 保存结果
- output_dir = Path(__file__).parent / "test_results"
- output_dir.mkdir(exist_ok=True)
- output_file = output_dir / f"ocr_pdf_{fp.stem}.json"
-
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump({"file": str(fp), "pages": pages_to_process, "texts": all_texts}, f, ensure_ascii=False, indent=2)
-
- print(f"\n 💾 结果已保存: {output_file}")
- print(f" 📊 汇总: {success_count}/{pages_to_process} 页成功, 共 {len(all_texts)} 个文本块")
-
- return success_count == pages_to_process
-
- except Exception as e:
- print_result(False, f"处理 PDF 异常: {e}")
- return False
- def run_ocr_tests(
- remove_watermark: bool = False,
- crop_header_footer: bool = False,
- max_pages: int = 0,
- ) -> bool:
- """运行 OCR_TEST_CASES 中所有 PDF 文件的 OCR 测试"""
- print_header("测试 OCR 接口 (PDF 文件)")
-
- # 检查 API
- if not check_health():
- print("\n❌ API 不可用")
- return False
-
- total = len(OCR_TEST_CASES)
- passed = 0
- failed = 0
-
- for idx, rel_path in enumerate(OCR_TEST_CASES, 1):
- fp = TEST_DIR / rel_path
- print(f"\n{'='*60}")
- print(f" [{idx}/{total}] {rel_path}")
- print(f"{'='*60}")
-
- if not fp.exists():
- print_result(False, f"文件不存在: {fp}")
- failed += 1
- continue
-
- if test_ocr_pdf(
- str(fp),
- remove_watermark=remove_watermark,
- crop_header_footer=crop_header_footer,
- max_pages=max_pages,
- ):
- passed += 1
- else:
- failed += 1
-
- # 汇总
- print_header("OCR 测试汇总")
- print(f" 总计: {total}")
- print(f" ✅ 通过: {passed}")
- print(f" ❌ 失败: {failed}")
- if failed == 0:
- print("\n🎉 所有 OCR 测试通过!")
- return failed == 0
- # ---------------------------------------------------------------------------
- # /pdf_to_markdown 接口测试
- # ---------------------------------------------------------------------------
- def upload_pdf_to_markdown(
- file_path: Path,
- backend: Optional[str] = None,
- remove_watermark: bool = False,
- crop_header_footer: bool = False,
- return_images: bool = False,
- ) -> Optional[str]:
- """上传文件到 /pdf_to_markdown 并返回 task_id"""
- print(f"\n 📤 上传文件: {file_path.name}")
- try:
- with open(file_path, "rb") as f:
- mime = "application/pdf" if file_path.suffix.lower() == ".pdf" else "image/*"
- files = {"file": (file_path.name, f, mime)}
- data: Dict[str, Any] = {}
- if backend:
- data["backend"] = backend
- if remove_watermark:
- data["remove_watermark"] = "true"
- if crop_header_footer:
- data["crop_header_footer"] = "true"
- if return_images:
- data["return_images"] = "true"
- response = requests.post(
- f"{API_BASE_URL}/pdf_to_markdown",
- files=files,
- data=data,
- timeout=60,
- )
- if response.status_code == 200:
- result = response.json()
- task_id = result.get("task_id")
- print(f" 任务 ID: {task_id}")
- return task_id
- else:
- print_result(False, f"上传失败: {response.status_code} - {response.text[:300]}")
- return None
- except Exception as e:
- print_result(False, f"上传异常: {e}")
- return None
- def download_markdown(task_id: str) -> Optional[str]:
- """从 /task/{task_id}/json 获取 markdown 文本"""
- try:
- response = requests.get(f"{API_BASE_URL}/task/{task_id}/json", timeout=30)
- if response.status_code == 200:
- data = response.json()
- return data.get("markdown", "")
- else:
- print_result(False, f"获取 Markdown 失败: {response.status_code}")
- return None
- except Exception as e:
- print_result(False, f"获取 Markdown 异常: {e}")
- return None
- def download_markdown_file(task_id: str, save_path: Path) -> bool:
- """从 /download/{task_id}/markdown 下载 .md 文件"""
- try:
- response = requests.get(f"{API_BASE_URL}/download/{task_id}/markdown", timeout=30)
- if response.status_code == 200:
- save_path.parent.mkdir(parents=True, exist_ok=True)
- save_path.write_bytes(response.content)
- print(f" 💾 Markdown 文件已保存: {save_path}")
- return True
- else:
- print_result(False, f"下载 Markdown 文件失败: {response.status_code}")
- return False
- except Exception as e:
- print_result(False, f"下载 Markdown 文件异常: {e}")
- return False
- def test_pdf_to_markdown(
- file_path: Optional[str] = None,
- backend: Optional[str] = None,
- remove_watermark: bool = False,
- crop_header_footer: bool = False,
- return_images: bool = False,
- max_wait: int = 600,
- ) -> bool:
- """测试 /pdf_to_markdown 接口
- Args:
- file_path: 要测试的文件路径,默认使用 TEST_DIR 下第一个 PDF
- backend: MinerU backend,留空使用服务端默认
- remove_watermark: 是否去水印
- crop_header_footer: 是否裁剪页眉页脚
- return_images: 是否返回图片
- max_wait: 最大等待秒数
- """
- print_header("测试 /pdf_to_markdown 接口")
- # 检查 API
- if not check_health():
- print("\n❌ API 不可用")
- return False
- # 确定测试文件列表
- if file_path:
- files_to_test = [Path(file_path)]
- else:
- # 遍历 PDF2MD_TEST_CASES 中所有文件
- files_to_test = []
- for fname in PDF2MD_TEST_CASES:
- fp = TEST_DIR / fname
- if fp.exists():
- files_to_test.append(fp)
- else:
- print(f" ⚠️ 跳过不存在的文件: {fname}")
- if not files_to_test:
- print_result(False, f"TEST_DIR ({TEST_DIR}) 中没有可用的测试文件")
- return False
- total = len(files_to_test)
- passed = 0
- failed = 0
- for idx, fp in enumerate(files_to_test, 1):
- print(f"\n{'='*60}")
- print(f" [{idx}/{total}] {fp.name}")
- print(f"{'='*60}")
- if not fp.exists():
- print_result(False, f"文件不存在: {fp}")
- failed += 1
- continue
- print(f" 📄 文件: {fp.name} ({fp.stat().st_size / 1024:.1f} KB)")
- if backend:
- print(f" 🔧 Backend: {backend}")
- if remove_watermark:
- print(f" 🔧 去水印: 是")
- if crop_header_footer:
- print(f" 🔧 裁剪页眉页脚: 是")
- if return_images:
- print(f" 🔧 返回图片: 是")
- # 1. 上传
- task_id = upload_pdf_to_markdown(fp, backend, remove_watermark, crop_header_footer, return_images)
- if not task_id:
- failed += 1
- continue
- # 2. 轮询
- task_result = poll_task_status(task_id, max_wait=max_wait)
- if not task_result:
- failed += 1
- continue
- # 3. 获取 Markdown 文本
- md_text = download_markdown(task_id)
- # 4. 下载 .md 文件
- output_dir = Path(__file__).parent / "test_results"
- output_dir.mkdir(exist_ok=True)
- md_file = output_dir / f"pdf2md_{fp.stem}.md"
- download_markdown_file(task_id, md_file)
- # 5. 下载 ZIP(如果 return_images)
- if return_images:
- try:
- zip_resp = requests.get(f"{API_BASE_URL}/download/{task_id}/zip", timeout=60)
- if zip_resp.status_code == 200:
- zip_file = output_dir / f"pdf2md_{fp.stem}.zip"
- zip_file.write_bytes(zip_resp.content)
- print(f" 💾 ZIP 文件已保存: {zip_file} ({len(zip_resp.content) / 1024:.1f} KB)")
- else:
- print_result(False, f"下载 ZIP 失败: {zip_resp.status_code}")
- except Exception as e:
- print_result(False, f"下载 ZIP 异常: {e}")
- # 6. 输出摘要
- if md_text:
- lines = md_text.strip().split("\n")
- print(f"\n 📝 Markdown 结果: {len(md_text)} 字符, {len(lines)} 行")
- print(f" --- 前 10 行 ---")
- for line in lines[:10]:
- display = line[:80] + "..." if len(line) > 80 else line
- print(f" {display}")
- if len(lines) > 10:
- print(f" ... 还有 {len(lines) - 10} 行")
- print_result(True, "PDF 转 Markdown 成功")
- passed += 1
- else:
- print_result(False, "未获取到 Markdown 内容")
- failed += 1
- # 打印汇总
- print_header("pdf_to_markdown 测试汇总")
- print(f" 总计: {total}")
- print(f" ✅ 通过: {passed}")
- print(f" ❌ 失败: {failed}")
- if failed == 0:
- print("\n🎉 所有 pdf_to_markdown 测试通过!")
- return failed == 0
- if __name__ == "__main__":
- if len(sys.argv) > 1:
- # 测试指定类型
- doc_type = sys.argv[1]
- if doc_type in ["--help", "-h"]:
- print("用法:")
- print(" python test_api.py # 运行所有 /convert 测试")
- print(" python test_api.py <type> # 测试指定文档类型")
- print(" python test_api.py ocr # 测试 OCR 接口(图片)")
- print(" python test_api.py ocr <path> [--nowm] [--crop]")
- print(" python test_api.py ocrpdf # 测试 OCR 接口(PDF 文件,遍历 OCR_TEST_CASES)")
- print(" python test_api.py ocrpdf <path> [--nowm] [--crop] [--pages=N]")
- print(" python test_api.py pdf2md # 测试 /pdf_to_markdown(默认文件)")
- print(" python test_api.py pdf2md <path> [--backend=X] [--nowm] [--crop] [--images]")
- print("\n可用类型:")
- for dtype in set(v if isinstance(v, str) else v[0] for v in TEST_CASES.values()):
- print(f" - {dtype}")
- print(" - ocr (OCR 图片识别)")
- print(" - ocrpdf (OCR PDF 文件,提取每页调用 /ocr)")
- print(" - pdf2md (PDF/图片转 Markdown)")
- print("\nOCR 去水印参数:")
- print(" --nowm 启用去水印")
- print(" --light=N 亮度阈值(0-255,默认200)")
- print(" --sat=N 饱和度阈值(0-255,默认30)")
- print("\nOCR 裁剪页眉页脚参数:")
- print(" --crop 启用裁剪页眉页脚(固定比例模式)")
- print(" --crop-auto 启用裁剪页眉页脚(自动检测模式)")
- print(" --header=N 页眉裁剪比例(0-1,默认0.05表示5%)")
- print(" --footer=N 页脚裁剪比例(0-1,默认0.05表示5%)")
- print("\npdf2md 参数:")
- print(" --backend=X 指定 MinerU backend")
- print(" --nowm 启用去水印")
- print(" --crop 启用裁剪页眉页脚")
- print(" --images 返回图片(可下载 ZIP)")
- print(" --wait=N 最大等待秒数(默认600)")
- print("\nocrpdf 参数:")
- print(" --nowm 启用去水印")
- print(" --crop 启用裁剪页眉页脚")
- print(" --pages=N 最大处理页数(0=全部)")
- elif doc_type == "ocr":
- # 解析 OCR 参数
- image_path = None
- remove_watermark = False
- light_threshold = 200
- saturation_threshold = 30
- crop_header_footer = False
- header_ratio = 0.05
- footer_ratio = 0.05
- auto_detect_header_footer = False
-
- for arg in sys.argv[2:]:
- if arg == "--nowm":
- remove_watermark = True
- elif arg == "--crop":
- crop_header_footer = True
- elif arg == "--crop-auto":
- crop_header_footer = True
- auto_detect_header_footer = True
- elif arg.startswith("--light="):
- try:
- light_threshold = int(arg.split("=")[1])
- except ValueError:
- print(f"警告: 无效的亮度阈值 {arg},使用默认值 200")
- elif arg.startswith("--sat="):
- try:
- saturation_threshold = int(arg.split("=")[1])
- except ValueError:
- print(f"警告: 无效的饱和度阈值 {arg},使用默认值 30")
- elif arg.startswith("--header="):
- try:
- header_ratio = float(arg.split("=")[1])
- except ValueError:
- print(f"警告: 无效的页眉比例 {arg},使用默认值 0.05")
- elif arg.startswith("--footer="):
- try:
- footer_ratio = float(arg.split("=")[1])
- except ValueError:
- print(f"警告: 无效的页脚比例 {arg},使用默认值 0.05")
- elif not arg.startswith("--"):
- image_path = arg
-
- test_ocr(
- image_path,
- remove_watermark,
- light_threshold,
- saturation_threshold,
- crop_header_footer,
- header_ratio,
- footer_ratio,
- auto_detect_header_footer
- )
- elif doc_type == "ocrpdf":
- # 解析 ocrpdf 参数
- ocrpdf_file = None
- ocrpdf_nowm = False
- ocrpdf_crop = False
- ocrpdf_pages = 0
- for arg in sys.argv[2:]:
- if arg == "--nowm":
- ocrpdf_nowm = True
- elif arg == "--crop":
- ocrpdf_crop = True
- elif arg.startswith("--pages="):
- try:
- ocrpdf_pages = int(arg.split("=", 1)[1])
- except ValueError:
- print(f"警告: 无效的页数 {arg},使用默认值 0(全部)")
- elif not arg.startswith("--"):
- ocrpdf_file = arg
- if ocrpdf_file:
- # 测试单个 PDF 文件
- print_header("测试 OCR 接口 (PDF 文件)")
- if not check_health():
- print("\n❌ API 不可用")
- else:
- test_ocr_pdf(
- ocrpdf_file,
- remove_watermark=ocrpdf_nowm,
- crop_header_footer=ocrpdf_crop,
- max_pages=ocrpdf_pages,
- )
- else:
- # 遍历 OCR_TEST_CASES
- run_ocr_tests(
- remove_watermark=ocrpdf_nowm,
- crop_header_footer=ocrpdf_crop,
- max_pages=ocrpdf_pages,
- )
- elif doc_type == "pdf2md":
- # 解析 pdf2md 参数
- pdf2md_file = None
- pdf2md_backend = None
- pdf2md_nowm = False
- pdf2md_crop = False
- pdf2md_images = False
- pdf2md_wait = 600
- for arg in sys.argv[2:]:
- if arg == "--nowm":
- pdf2md_nowm = True
- elif arg == "--crop":
- pdf2md_crop = True
- elif arg == "--images":
- pdf2md_images = True
- elif arg.startswith("--backend="):
- pdf2md_backend = arg.split("=", 1)[1]
- elif arg.startswith("--wait="):
- try:
- pdf2md_wait = int(arg.split("=", 1)[1])
- except ValueError:
- print(f"警告: 无效的等待时间 {arg},使用默认值 600")
- elif not arg.startswith("--"):
- pdf2md_file = arg
- test_pdf_to_markdown(
- file_path=pdf2md_file,
- backend=pdf2md_backend,
- remove_watermark=pdf2md_nowm,
- crop_header_footer=pdf2md_crop,
- return_images=pdf2md_images,
- max_wait=pdf2md_wait,
- )
- else:
- test_single(doc_type)
- else:
- # 运行所有测试
- run_all_tests()
|