""" 要素提取路由 统一使用Python进行DOCX解析和要素提取,支持异步处理大文档。 """ from fastapi import APIRouter, HTTPException, UploadFile, File, Form, BackgroundTasks from pydantic import BaseModel from typing import Dict, Any, Optional, List import time import uuid import asyncio from concurrent.futures import ThreadPoolExecutor from loguru import logger from ..services.docx_parser import parse_docx_file, blocks_to_text from ..services.element_extractor import element_extractor router = APIRouter() # 任务存储(生产环境应使用Redis) _tasks: Dict[str, Dict] = {} # 线程池用于CPU密集型的DOCX解析 _executor = ThreadPoolExecutor(max_workers=4) class ExtractFromTextRequest(BaseModel): """从纯文本提取要素的请求""" text: str attachment_id: int = 0 use_llm: bool = True class ExtractResponse(BaseModel): """提取响应""" success: bool doc_content: Optional[Dict[str, Any]] = None elements: List[Dict] = [] values: List[Dict] = [] statistics: Dict[str, Any] = {} processing_time_ms: int = 0 error: Optional[str] = None class TaskResponse(BaseModel): """异步任务响应""" task_id: str status: str # pending, processing, completed, failed progress: int = 0 message: str = "" result: Optional[Dict[str, Any]] = None # ============================================================ # 同步接口(小文档,<50页) # ============================================================ @router.post("/from-docx", response_model=ExtractResponse) async def extract_from_docx( file: UploadFile = File(...), attachment_id: int = Form(default=0), use_llm: bool = Form(default=True) ): """ 同步处理:上传DOCX → 解析 → 提取要素 适用于小文档(<50页,<5MB)。大文档请使用异步接口。 """ start_time = time.time() if not file.filename.lower().endswith('.docx'): raise HTTPException(status_code=400, detail="仅支持.docx文件") # 检查文件大小(限制5MB) content = await file.read() if len(content) > 5 * 1024 * 1024: raise HTTPException( status_code=400, detail="文件过大,请使用异步接口 /extract/async/from-docx" ) try: logger.info(f"同步处理文件: {file.filename}, size={len(content)}") # 1. 解析DOCX(在线程池中执行,避免阻塞) loop = asyncio.get_event_loop() doc_content = await loop.run_in_executor(_executor, parse_docx_file, content) logger.info(f"DOCX解析完成: {doc_content['totalBlocks']} 个块") # 2. 转为纯文本 text = blocks_to_text(doc_content['blocks']) # 3. 提取要素 result = await element_extractor.extract_from_text( text=text, attachment_id=attachment_id, use_llm=use_llm ) processing_time = int((time.time() - start_time) * 1000) logger.info(f"处理完成: {result['statistics']}, 耗时: {processing_time}ms") return ExtractResponse( success=True, doc_content=doc_content, elements=result['elements'], values=result['values'], statistics=result['statistics'], processing_time_ms=processing_time ) except Exception as e: logger.error(f"处理失败: {e}", exc_info=True) return ExtractResponse( success=False, error=str(e), processing_time_ms=int((time.time() - start_time) * 1000) ) # ============================================================ # 异步接口(大文档) # ============================================================ @router.post("/async/from-docx", response_model=TaskResponse) async def extract_from_docx_async( background_tasks: BackgroundTasks, file: UploadFile = File(...), attachment_id: int = Form(default=0), use_llm: bool = Form(default=True) ): """ 异步处理:上传DOCX → 返回任务ID → 后台解析和提取 适用于大文档(>50页)。通过 /extract/task/{task_id} 查询进度和结果。 """ if not file.filename.lower().endswith('.docx'): raise HTTPException(status_code=400, detail="仅支持.docx文件") # 读取文件内容 content = await file.read() # 创建任务 task_id = str(uuid.uuid4()) _tasks[task_id] = { "status": "pending", "progress": 0, "message": "任务已创建,等待处理", "result": None, "created_at": time.time() } logger.info(f"创建异步任务: task_id={task_id}, file={file.filename}, size={len(content)}") # 添加后台任务 background_tasks.add_task( _process_docx_task, task_id, content, attachment_id, use_llm ) return TaskResponse( task_id=task_id, status="pending", progress=0, message="任务已创建,正在后台处理" ) async def _process_docx_task( task_id: str, content: bytes, attachment_id: int, use_llm: bool ): """后台处理DOCX任务""" try: _tasks[task_id]["status"] = "processing" _tasks[task_id]["progress"] = 10 _tasks[task_id]["message"] = "正在解析DOCX文档..." start_time = time.time() # 1. 解析DOCX loop = asyncio.get_event_loop() doc_content = await loop.run_in_executor(_executor, parse_docx_file, content) _tasks[task_id]["progress"] = 40 _tasks[task_id]["message"] = f"解析完成,共{doc_content['totalBlocks']}个块,正在提取要素..." # 2. 转为纯文本 text = blocks_to_text(doc_content['blocks']) _tasks[task_id]["progress"] = 50 # 3. 提取要素 result = await element_extractor.extract_from_text( text=text, attachment_id=attachment_id, use_llm=use_llm ) processing_time = int((time.time() - start_time) * 1000) # 4. 保存结果 _tasks[task_id]["status"] = "completed" _tasks[task_id]["progress"] = 100 _tasks[task_id]["message"] = "处理完成" _tasks[task_id]["result"] = { "doc_content": doc_content, "elements": result['elements'], "values": result['values'], "statistics": result['statistics'], "processing_time_ms": processing_time } logger.info(f"异步任务完成: task_id={task_id}, 耗时={processing_time}ms") except Exception as e: logger.error(f"异步任务失败: task_id={task_id}, error={e}", exc_info=True) _tasks[task_id]["status"] = "failed" _tasks[task_id]["message"] = str(e) @router.get("/task/{task_id}", response_model=TaskResponse) async def get_task_status(task_id: str): """查询异步任务状态和结果""" if task_id not in _tasks: raise HTTPException(status_code=404, detail="任务不存在") task = _tasks[task_id] return TaskResponse( task_id=task_id, status=task["status"], progress=task["progress"], message=task["message"], result=task.get("result") ) # ============================================================ # 纯文本接口(兼容Java后端解析的场景) # ============================================================ @router.post("/from-text", response_model=ExtractResponse) async def extract_from_text(request: ExtractFromTextRequest): """ 从纯文本中提取要素 适用于已有解析文本的场景(如Java后端已解析)。 """ start_time = time.time() if not request.text or len(request.text.strip()) < 10: return ExtractResponse( success=False, error="文本内容为空或过短", processing_time_ms=0 ) try: result = await element_extractor.extract_from_text( text=request.text, attachment_id=request.attachment_id, use_llm=request.use_llm ) processing_time = int((time.time() - start_time) * 1000) return ExtractResponse( success=True, elements=result['elements'], values=result['values'], statistics=result['statistics'], processing_time_ms=processing_time ) except Exception as e: logger.error(f"要素提取失败: {e}", exc_info=True) return ExtractResponse( success=False, error=str(e), processing_time_ms=int((time.time() - start_time) * 1000) )