| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- """
- 要素提取器:使用NER和LLM从文档中提取实体
- 支持分章节提取和实体去重。
- """
- import json
- import asyncio
- from typing import Dict, List, Any, Optional
- from loguru import logger
- from .ner_service import ner_service
- class ElementExtractor:
- """
- 要素提取器
-
- 使用NER服务识别文档中的实体,可选使用LLM进行智能提取。
- 不预定义要素结构,返回动态识别的实体。
- """
-
- def __init__(self):
- self._deepseek_service = None
-
- @property
- def deepseek_service(self):
- """延迟加载deepseek服务"""
- if self._deepseek_service is None:
- try:
- from .deepseek_service import deepseek_service
- self._deepseek_service = deepseek_service
- except ImportError:
- logger.warning("DeepSeek服务未配置,LLM提取将跳过")
- self._deepseek_service = None
- return self._deepseek_service
-
- async def extract_from_text(
- self,
- text: str,
- attachment_id: int = 0,
- use_llm: bool = True
- ) -> Dict[str, Any]:
- """
- 从纯文本中提取实体(主接口)
-
- Args:
- text: 文档纯文本
- attachment_id: 附件ID
- use_llm: 是否使用LLM提取
-
- Returns:
- {
- "entities": [...], # NER识别的实体列表
- "llm_extractions": [...], # LLM提取的内容(可选)
- "statistics": {...}
- }
- """
- logger.info(f"开始提取实体: attachment_id={attachment_id}, "
- f"text_length={len(text)}, use_llm={use_llm}")
-
- # 1. 使用NER服务提取实体
- ner_entities = await self._extract_by_ner(text)
- logger.info(f"NER提取完成: {len(ner_entities)} 个实体")
-
- # 2. LLM智能提取(可选)
- llm_extractions = []
- if use_llm and self.deepseek_service:
- llm_extractions = await self._extract_by_llm(text)
- logger.info(f"LLM提取完成: {len(llm_extractions)} 个内容")
-
- return {
- "entities": ner_entities,
- "llm_extractions": llm_extractions,
- "statistics": {
- "ner_entity_count": len(ner_entities),
- "llm_extraction_count": len(llm_extractions),
- "text_length": len(text)
- }
- }
-
- async def _extract_by_ner(self, text: str) -> List[Dict]:
- """
- 使用NER服务提取实体
-
- 返回实体列表,每个实体包含:
- - text: 实体文本
- - type: 实体类型(DATE, ORG, PERSON, NUMBER, CODE等)
- - label: 实体标签
- - confidence: 置信度
- - position: 位置信息
- """
- try:
- # 调用现有的NER服务,返回EntityInfo对象列表
- entities = await ner_service.extract_entities(text)
-
- # 格式化输出(EntityInfo是Pydantic模型,使用属性访问)
- result = []
- for entity in entities:
- result.append({
- "text": entity.name,
- "type": entity.type,
- "label": entity.type,
- "confidence": entity.confidence,
- "position": {
- "start": entity.position.char_start if entity.position else 0,
- "end": entity.position.char_end if entity.position else 0
- }
- })
-
- return result
- except Exception as e:
- logger.error(f"NER提取失败: {e}")
- return []
-
- async def _extract_by_llm(self, text: str) -> List[Dict]:
- """
- 使用LLM智能提取关键信息
-
- 让LLM自动识别文档中的重要信息,不预设要提取什么。
- """
- if not self.deepseek_service:
- return []
-
- try:
- # 截取文档前部分进行分析
- sample_text = text[:8000] if len(text) > 8000 else text
-
- prompt = f"""请分析以下文档,提取其中的关键信息。
- 要求:
- 1. 识别文档类型(如:报告、合同、通知等)
- 2. 提取关键实体(如:组织名称、日期、金额、编号等)
- 3. 提取关键数据(如:得分、级别、数量等)
- 4. 以JSON格式返回
- 返回格式:
- {{
- "document_type": "文档类型",
- "key_entities": [
- {{"name": "实体名称", "type": "实体类型", "value": "实体值"}}
- ],
- "key_data": [
- {{"name": "数据名称", "value": "数据值", "unit": "单位"}}
- ],
- "summary": "文档摘要(50字以内)"
- }}
- 文档内容:
- {sample_text}
- 只返回JSON,不要其他内容。"""
- response = await self.deepseek_service.chat(prompt)
-
- if response:
- # 尝试解析JSON
- try:
- # 清理响应,提取JSON部分
- json_str = response.strip()
- if json_str.startswith("```"):
- json_str = json_str.split("```")[1]
- if json_str.startswith("json"):
- json_str = json_str[4:]
-
- data = json.loads(json_str)
-
- extractions = []
-
- # 文档类型
- if data.get("document_type"):
- extractions.append({
- "name": "文档类型",
- "value": data["document_type"],
- "source": "llm"
- })
-
- # 关键实体
- for entity in data.get("key_entities", []):
- extractions.append({
- "name": entity.get("name", ""),
- "type": entity.get("type", ""),
- "value": entity.get("value", ""),
- "source": "llm"
- })
-
- # 关键数据
- for item in data.get("key_data", []):
- value = item.get("value", "")
- if item.get("unit"):
- value = f"{value}{item['unit']}"
- extractions.append({
- "name": item.get("name", ""),
- "value": value,
- "source": "llm"
- })
-
- # 摘要
- if data.get("summary"):
- extractions.append({
- "name": "文档摘要",
- "value": data["summary"],
- "source": "llm"
- })
-
- return extractions
-
- except json.JSONDecodeError:
- logger.warning(f"LLM返回的不是有效JSON: {response[:200]}")
- return []
-
- return []
-
- except Exception as e:
- logger.error(f"LLM提取失败: {e}")
- return []
- async def extract_from_chapters(
- self,
- chapters: List[Dict],
- attachment_id: int = 0,
- use_llm: bool = True,
- parallel: bool = True
- ) -> Dict[str, Any]:
- """
- 分章节提取实体,最后去重合并
-
- Args:
- chapters: 章节列表,每个章节包含 {chapter_id, title, text}
- attachment_id: 附件ID
- use_llm: 是否使用LLM提取
- parallel: 是否并行处理章节
-
- Returns:
- {
- "entities": [...], # 去重后的实体列表
- "chapter_entities": {...}, # 按章节分组的实体
- "llm_extractions": [...],
- "statistics": {...}
- }
- """
- logger.info(f"开始分章节提取: {len(chapters)} 个章节, parallel={parallel}")
-
- chapter_results = {}
- all_entities = []
- all_llm_extractions = []
-
- if parallel and len(chapters) > 1:
- # 并行处理章节
- tasks = []
- for chapter in chapters:
- task = self._extract_chapter(chapter, attachment_id, use_llm)
- tasks.append(task)
-
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- for chapter, result in zip(chapters, results):
- if isinstance(result, Exception):
- logger.error(f"章节 {chapter['chapter_id']} 提取失败: {result}")
- continue
- chapter_results[chapter['chapter_id']] = result
- else:
- # 串行处理章节
- for chapter in chapters:
- try:
- result = await self._extract_chapter(chapter, attachment_id, use_llm)
- chapter_results[chapter['chapter_id']] = result
- except Exception as e:
- logger.error(f"章节 {chapter['chapter_id']} 提取失败: {e}")
-
- # 合并所有章节的实体
- for chapter_id, result in chapter_results.items():
- for entity in result.get('entities', []):
- entity['chapter_id'] = chapter_id
- all_entities.append(entity)
- all_llm_extractions.extend(result.get('llm_extractions', []))
-
- # 去重
- unique_entities = self._deduplicate_entities(all_entities)
- unique_llm = self._deduplicate_llm_extractions(all_llm_extractions)
-
- logger.info(f"分章节提取完成: 原始 {len(all_entities)} 个实体, 去重后 {len(unique_entities)} 个")
-
- return {
- "entities": unique_entities,
- "chapter_entities": chapter_results,
- "llm_extractions": unique_llm,
- "statistics": {
- "chapter_count": len(chapters),
- "total_entities_before_dedup": len(all_entities),
- "unique_entity_count": len(unique_entities),
- "llm_extraction_count": len(unique_llm)
- }
- }
-
- async def _extract_chapter(
- self,
- chapter: Dict,
- attachment_id: int,
- use_llm: bool
- ) -> Dict[str, Any]:
- """提取单个章节的实体"""
- chapter_id = chapter.get('chapter_id', 'unknown')
- title = chapter.get('title', '')
- text = chapter.get('text', '')
-
- if not text or len(text.strip()) < 10:
- return {"entities": [], "llm_extractions": []}
-
- logger.debug(f"提取章节 {chapter_id}: {title[:30]}... (长度: {len(text)})")
-
- # NER提取
- entities = await self._extract_by_ner(text)
-
- # 为每个实体添加章节信息
- for entity in entities:
- entity['chapter_id'] = chapter_id
- entity['chapter_title'] = title
-
- # LLM提取(可选)
- llm_extractions = []
- if use_llm and self.deepseek_service:
- llm_extractions = await self._extract_by_llm(text)
- for item in llm_extractions:
- item['chapter_id'] = chapter_id
- item['chapter_title'] = title
-
- return {
- "entities": entities,
- "llm_extractions": llm_extractions
- }
-
- def _deduplicate_entities(self, entities: List[Dict]) -> List[Dict]:
- """
- 实体去重
-
- 去重规则:
- 1. 相同类型+相同文本 -> 保留第一个出现的
- 2. 包含关系 -> 保留更长的实体
- """
- if not entities:
- return []
-
- # 按 (type, text) 去重
- seen = {}
- for entity in entities:
- key = (entity.get('type', ''), entity.get('text', ''))
- if key not in seen:
- seen[key] = entity
- else:
- # 保留置信度更高的
- if entity.get('confidence', 0) > seen[key].get('confidence', 0):
- seen[key] = entity
-
- unique = list(seen.values())
-
- # 处理包含关系(可选,较复杂)
- # 例如:"中国电建集团" 和 "中国电建集团成都勘测设计研究院有限公司"
- # 保留更长的
- final = []
- texts = set()
-
- # 按文本长度降序排序
- unique.sort(key=lambda x: len(x.get('text', '')), reverse=True)
-
- for entity in unique:
- text = entity.get('text', '')
- # 检查是否被更长的实体包含
- is_substring = False
- for existing_text in texts:
- if text in existing_text and text != existing_text:
- is_substring = True
- break
-
- if not is_substring:
- final.append(entity)
- texts.add(text)
-
- # 恢复原始顺序(按位置)
- final.sort(key=lambda x: x.get('position', {}).get('start', 0))
-
- return final
-
- def _deduplicate_llm_extractions(self, extractions: List[Dict]) -> List[Dict]:
- """LLM提取结果去重"""
- if not extractions:
- return []
-
- seen = {}
- for item in extractions:
- key = (item.get('name', ''), item.get('value', ''))
- if key not in seen:
- seen[key] = item
-
- return list(seen.values())
- # 创建单例
- element_extractor = ElementExtractor()
|