extract.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. """
  2. 要素提取路由
  3. 统一使用Python进行DOCX解析和要素提取,支持异步处理大文档。
  4. """
  5. from fastapi import APIRouter, HTTPException, UploadFile, File, Form, BackgroundTasks
  6. from pydantic import BaseModel
  7. from typing import Dict, Any, Optional, List
  8. import time
  9. import uuid
  10. import asyncio
  11. from concurrent.futures import ThreadPoolExecutor
  12. from loguru import logger
  13. from ..services.docx_parser import parse_docx_file, blocks_to_text
  14. from ..services.element_extractor import element_extractor
  15. router = APIRouter()
  16. # 任务存储(生产环境应使用Redis)
  17. _tasks: Dict[str, Dict] = {}
  18. # 线程池用于CPU密集型的DOCX解析
  19. _executor = ThreadPoolExecutor(max_workers=4)
  20. class ExtractFromTextRequest(BaseModel):
  21. """从纯文本提取要素的请求"""
  22. text: str
  23. attachment_id: int = 0
  24. use_llm: bool = True
  25. class ExtractResponse(BaseModel):
  26. """提取响应"""
  27. success: bool
  28. doc_content: Optional[Dict[str, Any]] = None
  29. elements: List[Dict] = []
  30. values: List[Dict] = []
  31. statistics: Dict[str, Any] = {}
  32. processing_time_ms: int = 0
  33. error: Optional[str] = None
  34. class TaskResponse(BaseModel):
  35. """异步任务响应"""
  36. task_id: str
  37. status: str # pending, processing, completed, failed
  38. progress: int = 0
  39. message: str = ""
  40. result: Optional[Dict[str, Any]] = None
  41. # ============================================================
  42. # 同步接口(小文档,<50页)
  43. # ============================================================
  44. @router.post("/from-docx", response_model=ExtractResponse)
  45. async def extract_from_docx(
  46. file: UploadFile = File(...),
  47. attachment_id: int = Form(default=0),
  48. use_llm: bool = Form(default=True)
  49. ):
  50. """
  51. 同步处理:上传DOCX → 解析 → 提取要素
  52. 适用于小文档(<50页,<5MB)。大文档请使用异步接口。
  53. """
  54. start_time = time.time()
  55. if not file.filename.lower().endswith('.docx'):
  56. raise HTTPException(status_code=400, detail="仅支持.docx文件")
  57. # 检查文件大小(限制5MB)
  58. content = await file.read()
  59. if len(content) > 5 * 1024 * 1024:
  60. raise HTTPException(
  61. status_code=400,
  62. detail="文件过大,请使用异步接口 /extract/async/from-docx"
  63. )
  64. try:
  65. logger.info(f"同步处理文件: {file.filename}, size={len(content)}")
  66. # 1. 解析DOCX(在线程池中执行,避免阻塞)
  67. loop = asyncio.get_event_loop()
  68. doc_content = await loop.run_in_executor(_executor, parse_docx_file, content)
  69. logger.info(f"DOCX解析完成: {doc_content['totalBlocks']} 个块")
  70. # 2. 转为纯文本
  71. text = blocks_to_text(doc_content['blocks'])
  72. # 3. 提取要素
  73. result = await element_extractor.extract_from_text(
  74. text=text,
  75. attachment_id=attachment_id,
  76. use_llm=use_llm
  77. )
  78. processing_time = int((time.time() - start_time) * 1000)
  79. logger.info(f"处理完成: {result['statistics']}, 耗时: {processing_time}ms")
  80. return ExtractResponse(
  81. success=True,
  82. doc_content=doc_content,
  83. elements=result['elements'],
  84. values=result['values'],
  85. statistics=result['statistics'],
  86. processing_time_ms=processing_time
  87. )
  88. except Exception as e:
  89. logger.error(f"处理失败: {e}", exc_info=True)
  90. return ExtractResponse(
  91. success=False,
  92. error=str(e),
  93. processing_time_ms=int((time.time() - start_time) * 1000)
  94. )
  95. # ============================================================
  96. # 异步接口(大文档)
  97. # ============================================================
  98. @router.post("/async/from-docx", response_model=TaskResponse)
  99. async def extract_from_docx_async(
  100. background_tasks: BackgroundTasks,
  101. file: UploadFile = File(...),
  102. attachment_id: int = Form(default=0),
  103. use_llm: bool = Form(default=True)
  104. ):
  105. """
  106. 异步处理:上传DOCX → 返回任务ID → 后台解析和提取
  107. 适用于大文档(>50页)。通过 /extract/task/{task_id} 查询进度和结果。
  108. """
  109. if not file.filename.lower().endswith('.docx'):
  110. raise HTTPException(status_code=400, detail="仅支持.docx文件")
  111. # 读取文件内容
  112. content = await file.read()
  113. # 创建任务
  114. task_id = str(uuid.uuid4())
  115. _tasks[task_id] = {
  116. "status": "pending",
  117. "progress": 0,
  118. "message": "任务已创建,等待处理",
  119. "result": None,
  120. "created_at": time.time()
  121. }
  122. logger.info(f"创建异步任务: task_id={task_id}, file={file.filename}, size={len(content)}")
  123. # 添加后台任务
  124. background_tasks.add_task(
  125. _process_docx_task,
  126. task_id,
  127. content,
  128. attachment_id,
  129. use_llm
  130. )
  131. return TaskResponse(
  132. task_id=task_id,
  133. status="pending",
  134. progress=0,
  135. message="任务已创建,正在后台处理"
  136. )
  137. async def _process_docx_task(
  138. task_id: str,
  139. content: bytes,
  140. attachment_id: int,
  141. use_llm: bool
  142. ):
  143. """后台处理DOCX任务"""
  144. try:
  145. _tasks[task_id]["status"] = "processing"
  146. _tasks[task_id]["progress"] = 10
  147. _tasks[task_id]["message"] = "正在解析DOCX文档..."
  148. start_time = time.time()
  149. # 1. 解析DOCX
  150. loop = asyncio.get_event_loop()
  151. doc_content = await loop.run_in_executor(_executor, parse_docx_file, content)
  152. _tasks[task_id]["progress"] = 40
  153. _tasks[task_id]["message"] = f"解析完成,共{doc_content['totalBlocks']}个块,正在提取要素..."
  154. # 2. 转为纯文本
  155. text = blocks_to_text(doc_content['blocks'])
  156. _tasks[task_id]["progress"] = 50
  157. # 3. 提取要素
  158. result = await element_extractor.extract_from_text(
  159. text=text,
  160. attachment_id=attachment_id,
  161. use_llm=use_llm
  162. )
  163. processing_time = int((time.time() - start_time) * 1000)
  164. # 4. 保存结果
  165. _tasks[task_id]["status"] = "completed"
  166. _tasks[task_id]["progress"] = 100
  167. _tasks[task_id]["message"] = "处理完成"
  168. _tasks[task_id]["result"] = {
  169. "doc_content": doc_content,
  170. "elements": result['elements'],
  171. "values": result['values'],
  172. "statistics": result['statistics'],
  173. "processing_time_ms": processing_time
  174. }
  175. logger.info(f"异步任务完成: task_id={task_id}, 耗时={processing_time}ms")
  176. except Exception as e:
  177. logger.error(f"异步任务失败: task_id={task_id}, error={e}", exc_info=True)
  178. _tasks[task_id]["status"] = "failed"
  179. _tasks[task_id]["message"] = str(e)
  180. @router.get("/task/{task_id}", response_model=TaskResponse)
  181. async def get_task_status(task_id: str):
  182. """查询异步任务状态和结果"""
  183. if task_id not in _tasks:
  184. raise HTTPException(status_code=404, detail="任务不存在")
  185. task = _tasks[task_id]
  186. return TaskResponse(
  187. task_id=task_id,
  188. status=task["status"],
  189. progress=task["progress"],
  190. message=task["message"],
  191. result=task.get("result")
  192. )
  193. # ============================================================
  194. # 纯文本接口(兼容Java后端解析的场景)
  195. # ============================================================
  196. @router.post("/from-text", response_model=ExtractResponse)
  197. async def extract_from_text(request: ExtractFromTextRequest):
  198. """
  199. 从纯文本中提取要素
  200. 适用于已有解析文本的场景(如Java后端已解析)。
  201. """
  202. start_time = time.time()
  203. if not request.text or len(request.text.strip()) < 10:
  204. return ExtractResponse(
  205. success=False,
  206. error="文本内容为空或过短",
  207. processing_time_ms=0
  208. )
  209. try:
  210. result = await element_extractor.extract_from_text(
  211. text=request.text,
  212. attachment_id=request.attachment_id,
  213. use_llm=request.use_llm
  214. )
  215. processing_time = int((time.time() - start_time) * 1000)
  216. return ExtractResponse(
  217. success=True,
  218. elements=result['elements'],
  219. values=result['values'],
  220. statistics=result['statistics'],
  221. processing_time_ms=processing_time
  222. )
  223. except Exception as e:
  224. logger.error(f"要素提取失败: {e}", exc_info=True)
  225. return ExtractResponse(
  226. success=False,
  227. error=str(e),
  228. processing_time_ms=int((time.time() - start_time) * 1000)
  229. )