""" 要素提取器:使用NER和LLM从文档中提取实体 支持分章节提取和实体去重。 """ import json import asyncio from typing import Dict, List, Any, Optional from loguru import logger from .ner_service import ner_service class ElementExtractor: """ 要素提取器 使用NER服务识别文档中的实体,可选使用LLM进行智能提取。 不预定义要素结构,返回动态识别的实体。 """ def __init__(self): self._deepseek_service = None @property def deepseek_service(self): """延迟加载deepseek服务""" if self._deepseek_service is None: try: from .deepseek_service import deepseek_service self._deepseek_service = deepseek_service except ImportError: logger.warning("DeepSeek服务未配置,LLM提取将跳过") self._deepseek_service = None return self._deepseek_service async def extract_from_text( self, text: str, attachment_id: int = 0, use_llm: bool = True ) -> Dict[str, Any]: """ 从纯文本中提取实体(主接口) Args: text: 文档纯文本 attachment_id: 附件ID use_llm: 是否使用LLM提取 Returns: { "entities": [...], # NER识别的实体列表 "llm_extractions": [...], # LLM提取的内容(可选) "statistics": {...} } """ logger.info(f"开始提取实体: attachment_id={attachment_id}, " f"text_length={len(text)}, use_llm={use_llm}") # 1. 使用NER服务提取实体 ner_entities = await self._extract_by_ner(text) logger.info(f"NER提取完成: {len(ner_entities)} 个实体") # 2. LLM智能提取(可选) llm_extractions = [] if use_llm and self.deepseek_service: llm_extractions = await self._extract_by_llm(text) logger.info(f"LLM提取完成: {len(llm_extractions)} 个内容") return { "entities": ner_entities, "llm_extractions": llm_extractions, "statistics": { "ner_entity_count": len(ner_entities), "llm_extraction_count": len(llm_extractions), "text_length": len(text) } } async def _extract_by_ner(self, text: str) -> List[Dict]: """ 使用NER服务提取实体 返回实体列表,每个实体包含: - text: 实体文本 - type: 实体类型(DATE, ORG, PERSON, NUMBER, CODE等) - label: 实体标签 - confidence: 置信度 - position: 位置信息 """ try: # 调用现有的NER服务,返回EntityInfo对象列表 entities = await ner_service.extract_entities(text) # 格式化输出(EntityInfo是Pydantic模型,使用属性访问) result = [] for entity in entities: result.append({ "text": entity.name, "type": entity.type, "label": entity.type, "confidence": entity.confidence, "position": { "start": entity.position.char_start if entity.position else 0, "end": entity.position.char_end if entity.position else 0 } }) return result except Exception as e: logger.error(f"NER提取失败: {e}") return [] async def _extract_by_llm(self, text: str) -> List[Dict]: """ 使用LLM智能提取关键信息 让LLM自动识别文档中的重要信息,不预设要提取什么。 """ if not self.deepseek_service: return [] try: # 截取文档前部分进行分析 sample_text = text[:8000] if len(text) > 8000 else text prompt = f"""请分析以下文档,提取其中的关键信息。 要求: 1. 识别文档类型(如:报告、合同、通知等) 2. 提取关键实体(如:组织名称、日期、金额、编号等) 3. 提取关键数据(如:得分、级别、数量等) 4. 以JSON格式返回 返回格式: {{ "document_type": "文档类型", "key_entities": [ {{"name": "实体名称", "type": "实体类型", "value": "实体值"}} ], "key_data": [ {{"name": "数据名称", "value": "数据值", "unit": "单位"}} ], "summary": "文档摘要(50字以内)" }} 文档内容: {sample_text} 只返回JSON,不要其他内容。""" response = await self.deepseek_service.chat(prompt) if response: # 尝试解析JSON try: # 清理响应,提取JSON部分 json_str = response.strip() if json_str.startswith("```"): json_str = json_str.split("```")[1] if json_str.startswith("json"): json_str = json_str[4:] data = json.loads(json_str) extractions = [] # 文档类型 if data.get("document_type"): extractions.append({ "name": "文档类型", "value": data["document_type"], "source": "llm" }) # 关键实体 for entity in data.get("key_entities", []): extractions.append({ "name": entity.get("name", ""), "type": entity.get("type", ""), "value": entity.get("value", ""), "source": "llm" }) # 关键数据 for item in data.get("key_data", []): value = item.get("value", "") if item.get("unit"): value = f"{value}{item['unit']}" extractions.append({ "name": item.get("name", ""), "value": value, "source": "llm" }) # 摘要 if data.get("summary"): extractions.append({ "name": "文档摘要", "value": data["summary"], "source": "llm" }) return extractions except json.JSONDecodeError: logger.warning(f"LLM返回的不是有效JSON: {response[:200]}") return [] return [] except Exception as e: logger.error(f"LLM提取失败: {e}") return [] async def extract_from_chapters( self, chapters: List[Dict], attachment_id: int = 0, use_llm: bool = True, parallel: bool = True ) -> Dict[str, Any]: """ 分章节提取实体,最后去重合并 Args: chapters: 章节列表,每个章节包含 {chapter_id, title, text} attachment_id: 附件ID use_llm: 是否使用LLM提取 parallel: 是否并行处理章节 Returns: { "entities": [...], # 去重后的实体列表 "chapter_entities": {...}, # 按章节分组的实体 "llm_extractions": [...], "statistics": {...} } """ logger.info(f"开始分章节提取: {len(chapters)} 个章节, parallel={parallel}") chapter_results = {} all_entities = [] all_llm_extractions = [] if parallel and len(chapters) > 1: # 并行处理章节 tasks = [] for chapter in chapters: task = self._extract_chapter(chapter, attachment_id, use_llm) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) for chapter, result in zip(chapters, results): if isinstance(result, Exception): logger.error(f"章节 {chapter['chapter_id']} 提取失败: {result}") continue chapter_results[chapter['chapter_id']] = result else: # 串行处理章节 for chapter in chapters: try: result = await self._extract_chapter(chapter, attachment_id, use_llm) chapter_results[chapter['chapter_id']] = result except Exception as e: logger.error(f"章节 {chapter['chapter_id']} 提取失败: {e}") # 合并所有章节的实体 for chapter_id, result in chapter_results.items(): for entity in result.get('entities', []): entity['chapter_id'] = chapter_id all_entities.append(entity) all_llm_extractions.extend(result.get('llm_extractions', [])) # 去重 unique_entities = self._deduplicate_entities(all_entities) unique_llm = self._deduplicate_llm_extractions(all_llm_extractions) logger.info(f"分章节提取完成: 原始 {len(all_entities)} 个实体, 去重后 {len(unique_entities)} 个") return { "entities": unique_entities, "chapter_entities": chapter_results, "llm_extractions": unique_llm, "statistics": { "chapter_count": len(chapters), "total_entities_before_dedup": len(all_entities), "unique_entity_count": len(unique_entities), "llm_extraction_count": len(unique_llm) } } async def _extract_chapter( self, chapter: Dict, attachment_id: int, use_llm: bool ) -> Dict[str, Any]: """提取单个章节的实体""" chapter_id = chapter.get('chapter_id', 'unknown') title = chapter.get('title', '') text = chapter.get('text', '') if not text or len(text.strip()) < 10: return {"entities": [], "llm_extractions": []} logger.debug(f"提取章节 {chapter_id}: {title[:30]}... (长度: {len(text)})") # NER提取 entities = await self._extract_by_ner(text) # 为每个实体添加章节信息 for entity in entities: entity['chapter_id'] = chapter_id entity['chapter_title'] = title # LLM提取(可选) llm_extractions = [] if use_llm and self.deepseek_service: llm_extractions = await self._extract_by_llm(text) for item in llm_extractions: item['chapter_id'] = chapter_id item['chapter_title'] = title return { "entities": entities, "llm_extractions": llm_extractions } def _deduplicate_entities(self, entities: List[Dict]) -> List[Dict]: """ 实体去重 去重规则: 1. 相同类型+相同文本 -> 保留第一个出现的 2. 包含关系 -> 保留更长的实体 """ if not entities: return [] # 按 (type, text) 去重 seen = {} for entity in entities: key = (entity.get('type', ''), entity.get('text', '')) if key not in seen: seen[key] = entity else: # 保留置信度更高的 if entity.get('confidence', 0) > seen[key].get('confidence', 0): seen[key] = entity unique = list(seen.values()) # 处理包含关系(可选,较复杂) # 例如:"中国电建集团" 和 "中国电建集团成都勘测设计研究院有限公司" # 保留更长的 final = [] texts = set() # 按文本长度降序排序 unique.sort(key=lambda x: len(x.get('text', '')), reverse=True) for entity in unique: text = entity.get('text', '') # 检查是否被更长的实体包含 is_substring = False for existing_text in texts: if text in existing_text and text != existing_text: is_substring = True break if not is_substring: final.append(entity) texts.add(text) # 恢复原始顺序(按位置) final.sort(key=lambda x: x.get('position', {}).get('start', 0)) return final def _deduplicate_llm_extractions(self, extractions: List[Dict]) -> List[Dict]: """LLM提取结果去重""" if not extractions: return [] seen = {} for item in extractions: key = (item.get('name', ''), item.get('value', '')) if key not in seen: seen[key] = item return list(seen.values()) # 创建单例 element_extractor = ElementExtractor()