| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- """
- 要素提取路由
- 统一使用Python进行DOCX解析和要素提取,支持异步处理大文档。
- """
- from fastapi import APIRouter, HTTPException, UploadFile, File, Form, BackgroundTasks
- from pydantic import BaseModel
- from typing import Dict, Any, Optional, List
- import time
- import uuid
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- from loguru import logger
- from ..services.docx_parser import parse_docx_file, blocks_to_text
- from ..services.element_extractor import element_extractor
- router = APIRouter()
- # 任务存储(生产环境应使用Redis)
- _tasks: Dict[str, Dict] = {}
- # 线程池用于CPU密集型的DOCX解析
- _executor = ThreadPoolExecutor(max_workers=4)
- class ExtractFromTextRequest(BaseModel):
- """从纯文本提取要素的请求"""
- text: str
- attachment_id: int = 0
- use_llm: bool = True
- class ExtractResponse(BaseModel):
- """提取响应"""
- success: bool
- doc_content: Optional[Dict[str, Any]] = None
- elements: List[Dict] = []
- values: List[Dict] = []
- statistics: Dict[str, Any] = {}
- processing_time_ms: int = 0
- error: Optional[str] = None
- class TaskResponse(BaseModel):
- """异步任务响应"""
- task_id: str
- status: str # pending, processing, completed, failed
- progress: int = 0
- message: str = ""
- result: Optional[Dict[str, Any]] = None
- # ============================================================
- # 同步接口(小文档,<50页)
- # ============================================================
- @router.post("/from-docx", response_model=ExtractResponse)
- async def extract_from_docx(
- file: UploadFile = File(...),
- attachment_id: int = Form(default=0),
- use_llm: bool = Form(default=True)
- ):
- """
- 同步处理:上传DOCX → 解析 → 提取要素
-
- 适用于小文档(<50页,<5MB)。大文档请使用异步接口。
- """
- start_time = time.time()
-
- if not file.filename.lower().endswith('.docx'):
- raise HTTPException(status_code=400, detail="仅支持.docx文件")
-
- # 检查文件大小(限制5MB)
- content = await file.read()
- if len(content) > 5 * 1024 * 1024:
- raise HTTPException(
- status_code=400,
- detail="文件过大,请使用异步接口 /extract/async/from-docx"
- )
-
- try:
- logger.info(f"同步处理文件: {file.filename}, size={len(content)}")
-
- # 1. 解析DOCX(在线程池中执行,避免阻塞)
- loop = asyncio.get_event_loop()
- doc_content = await loop.run_in_executor(_executor, parse_docx_file, content)
- logger.info(f"DOCX解析完成: {doc_content['totalBlocks']} 个块")
-
- # 2. 转为纯文本
- text = blocks_to_text(doc_content['blocks'])
-
- # 3. 提取要素
- result = await element_extractor.extract_from_text(
- text=text,
- attachment_id=attachment_id,
- use_llm=use_llm
- )
-
- processing_time = int((time.time() - start_time) * 1000)
- logger.info(f"处理完成: {result['statistics']}, 耗时: {processing_time}ms")
-
- return ExtractResponse(
- success=True,
- doc_content=doc_content,
- elements=result['elements'],
- values=result['values'],
- statistics=result['statistics'],
- processing_time_ms=processing_time
- )
- except Exception as e:
- logger.error(f"处理失败: {e}", exc_info=True)
- return ExtractResponse(
- success=False,
- error=str(e),
- processing_time_ms=int((time.time() - start_time) * 1000)
- )
- # ============================================================
- # 异步接口(大文档)
- # ============================================================
- @router.post("/async/from-docx", response_model=TaskResponse)
- async def extract_from_docx_async(
- background_tasks: BackgroundTasks,
- file: UploadFile = File(...),
- attachment_id: int = Form(default=0),
- use_llm: bool = Form(default=True)
- ):
- """
- 异步处理:上传DOCX → 返回任务ID → 后台解析和提取
-
- 适用于大文档(>50页)。通过 /extract/task/{task_id} 查询进度和结果。
- """
- if not file.filename.lower().endswith('.docx'):
- raise HTTPException(status_code=400, detail="仅支持.docx文件")
-
- # 读取文件内容
- content = await file.read()
-
- # 创建任务
- task_id = str(uuid.uuid4())
- _tasks[task_id] = {
- "status": "pending",
- "progress": 0,
- "message": "任务已创建,等待处理",
- "result": None,
- "created_at": time.time()
- }
-
- logger.info(f"创建异步任务: task_id={task_id}, file={file.filename}, size={len(content)}")
-
- # 添加后台任务
- background_tasks.add_task(
- _process_docx_task,
- task_id,
- content,
- attachment_id,
- use_llm
- )
-
- return TaskResponse(
- task_id=task_id,
- status="pending",
- progress=0,
- message="任务已创建,正在后台处理"
- )
- async def _process_docx_task(
- task_id: str,
- content: bytes,
- attachment_id: int,
- use_llm: bool
- ):
- """后台处理DOCX任务"""
- try:
- _tasks[task_id]["status"] = "processing"
- _tasks[task_id]["progress"] = 10
- _tasks[task_id]["message"] = "正在解析DOCX文档..."
-
- start_time = time.time()
-
- # 1. 解析DOCX
- loop = asyncio.get_event_loop()
- doc_content = await loop.run_in_executor(_executor, parse_docx_file, content)
-
- _tasks[task_id]["progress"] = 40
- _tasks[task_id]["message"] = f"解析完成,共{doc_content['totalBlocks']}个块,正在提取要素..."
-
- # 2. 转为纯文本
- text = blocks_to_text(doc_content['blocks'])
-
- _tasks[task_id]["progress"] = 50
-
- # 3. 提取要素
- result = await element_extractor.extract_from_text(
- text=text,
- attachment_id=attachment_id,
- use_llm=use_llm
- )
-
- processing_time = int((time.time() - start_time) * 1000)
-
- # 4. 保存结果
- _tasks[task_id]["status"] = "completed"
- _tasks[task_id]["progress"] = 100
- _tasks[task_id]["message"] = "处理完成"
- _tasks[task_id]["result"] = {
- "doc_content": doc_content,
- "elements": result['elements'],
- "values": result['values'],
- "statistics": result['statistics'],
- "processing_time_ms": processing_time
- }
-
- logger.info(f"异步任务完成: task_id={task_id}, 耗时={processing_time}ms")
-
- except Exception as e:
- logger.error(f"异步任务失败: task_id={task_id}, error={e}", exc_info=True)
- _tasks[task_id]["status"] = "failed"
- _tasks[task_id]["message"] = str(e)
- @router.get("/task/{task_id}", response_model=TaskResponse)
- async def get_task_status(task_id: str):
- """查询异步任务状态和结果"""
- if task_id not in _tasks:
- raise HTTPException(status_code=404, detail="任务不存在")
-
- task = _tasks[task_id]
- return TaskResponse(
- task_id=task_id,
- status=task["status"],
- progress=task["progress"],
- message=task["message"],
- result=task.get("result")
- )
- # ============================================================
- # 纯文本接口(兼容Java后端解析的场景)
- # ============================================================
- @router.post("/from-text", response_model=ExtractResponse)
- async def extract_from_text(request: ExtractFromTextRequest):
- """
- 从纯文本中提取要素
-
- 适用于已有解析文本的场景(如Java后端已解析)。
- """
- start_time = time.time()
-
- if not request.text or len(request.text.strip()) < 10:
- return ExtractResponse(
- success=False,
- error="文本内容为空或过短",
- processing_time_ms=0
- )
-
- try:
- result = await element_extractor.extract_from_text(
- text=request.text,
- attachment_id=request.attachment_id,
- use_llm=request.use_llm
- )
-
- processing_time = int((time.time() - start_time) * 1000)
-
- return ExtractResponse(
- success=True,
- elements=result['elements'],
- values=result['values'],
- statistics=result['statistics'],
- processing_time_ms=processing_time
- )
- except Exception as e:
- logger.error(f"要素提取失败: {e}", exc_info=True)
- return ExtractResponse(
- success=False,
- error=str(e),
- processing_time_ms=int((time.time() - start_time) * 1000)
- )
|