| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- """
- Ollama LLM 服务
- 用于调用本地 Ollama 模型进行 NER 提取
- """
- import json
- import re
- import uuid
- import httpx
- from typing import List, Optional, Dict, Any
- from loguru import logger
- from ..config import settings
- from ..models import EntityInfo, PositionInfo
- class OllamaService:
- """Ollama LLM 服务"""
-
- def __init__(self):
- self.base_url = settings.ollama_url
- self.model = settings.ollama_model
- self.timeout = settings.ollama_timeout
- self.chunk_size = settings.chunk_size
- self.chunk_overlap = settings.chunk_overlap
- logger.info(f"初始化 Ollama 服务: url={self.base_url}, model={self.model}")
-
- def _split_text(self, text: str) -> List[Dict[str, Any]]:
- """
- 将长文本分割成多个块
-
- Args:
- text: 原始文本
-
- Returns:
- 分块列表,每个块包含 text, start_pos, end_pos
- """
- if len(text) <= self.chunk_size:
- return [{"text": text, "start_pos": 0, "end_pos": len(text)}]
-
- chunks = []
- start = 0
-
- while start < len(text):
- end = min(start + self.chunk_size, len(text))
-
- # 尝试在句号、换行处分割,避免截断句子
- if end < len(text):
- # 向前查找最近的分隔符
- for sep in ['\n\n', '\n', '。', ';', '!', '?', '.']:
- sep_pos = text.rfind(sep, start + self.chunk_size // 2, end)
- if sep_pos > start:
- end = sep_pos + len(sep)
- break
-
- chunk_text = text[start:end]
- chunks.append({
- "text": chunk_text,
- "start_pos": start,
- "end_pos": end
- })
-
- # 下一个块的起始位置(考虑重叠)
- start = end - self.chunk_overlap if end < len(text) else end
-
- logger.info(f"文本分割完成: 总长度={len(text)}, 分块数={len(chunks)}")
- return chunks
-
- def _build_ner_prompt(self, text: str, entity_types: Optional[List[str]] = None) -> str:
- """
- 构建 NER 提取的 Prompt
- """
- types = entity_types or settings.entity_types
- types_desc = ", ".join(types)
-
- prompt = f"""你是一个专业的命名实体识别(NER)系统。请从以下文本中提取实体。
- ## 任务要求
- 1. 识别以下类型的实体: {types_desc}
- 2. 每个实体需要包含: 名称(name)、类型(type)、在文本中的起始位置(charStart)和结束位置(charEnd)
- 3. 只提取明确的、有意义的实体,避免提取过于泛化的词汇
- 4. 严格按照 JSON 格式输出
- ## 实体类型说明
- - PERSON: 人名(如:张三、李经理)
- - ORG: 机构/组织/公司(如:成都检测公司、环保局)
- - LOC: 地点/地址(如:成都市、高新区)
- - DATE: 日期时间(如:2024年5月15日、2024-05-15)
- - NUMBER: 带单位的数值(如:50分贝、100万元)
- - DEVICE: 设备仪器(如:噪音检测仪、分析仪器)
- - PROJECT: 项目/工程(如:环境监测项目、XX工程)
- - METHOD: 方法/标准(如:GB/T 12345、检测方法)
- ## 输出格式
- 请严格按以下 JSON 格式输出,不要包含其他内容:
- ```json
- {{
- "entities": [
- {{"name": "实体名称", "type": "实体类型", "charStart": 起始位置, "charEnd": 结束位置}}
- ]
- }}
- ```
- ## 待处理文本
- {text}
- ## 提取结果
- """
- return prompt
-
- async def _call_ollama(self, prompt: str) -> Optional[str]:
- """
- 调用 Ollama API
- """
- url = f"{self.base_url}/api/generate"
- payload = {
- "model": self.model,
- "prompt": prompt,
- "stream": False,
- "options": {
- "temperature": 0.1, # 低温度,更确定性的输出
- "num_predict": 4096, # 最大输出 token
- }
- }
-
- try:
- async with httpx.AsyncClient(timeout=self.timeout) as client:
- response = await client.post(url, json=payload)
- response.raise_for_status()
- result = response.json()
- return result.get("response", "")
- except httpx.TimeoutException:
- logger.error(f"Ollama 请求超时: timeout={self.timeout}s")
- return None
- except Exception as e:
- logger.error(f"Ollama 请求失败: {e}")
- return None
-
- def _parse_llm_response(self, response: str, chunk_start_pos: int = 0) -> List[EntityInfo]:
- """
- 解析 LLM 返回的 JSON 结果
-
- Args:
- response: LLM 返回的文本
- chunk_start_pos: 当前分块在原文中的起始位置(用于位置校正)
- """
- entities = []
-
- try:
- # 尝试提取 JSON 部分
- json_match = re.search(r'\{[\s\S]*\}', response)
- if not json_match:
- logger.warning("LLM 响应中未找到 JSON")
- return entities
-
- json_str = json_match.group()
- data = json.loads(json_str)
-
- entity_list = data.get("entities", [])
-
- for item in entity_list:
- name = item.get("name", "").strip()
- entity_type = item.get("type", "").upper()
- char_start = item.get("charStart", 0)
- char_end = item.get("charEnd", 0)
-
- if not name or len(name) < 2:
- continue
-
- # 校正位置(加上分块的起始位置)
- adjusted_start = char_start + chunk_start_pos
- adjusted_end = char_end + chunk_start_pos
-
- entity = EntityInfo(
- name=name,
- type=entity_type,
- value=name,
- position=PositionInfo(
- char_start=adjusted_start,
- char_end=adjusted_end,
- line=1 # LLM 模式不计算行号
- ),
- confidence=0.9, # LLM 模式默认较高置信度
- temp_id=str(uuid.uuid4())[:8]
- )
- entities.append(entity)
-
- except json.JSONDecodeError as e:
- logger.warning(f"JSON 解析失败: {e}, response={response[:200]}...")
- except Exception as e:
- logger.error(f"解析 LLM 响应失败: {e}")
-
- return entities
-
- async def extract_entities(
- self,
- text: str,
- entity_types: Optional[List[str]] = None
- ) -> List[EntityInfo]:
- """
- 使用 Ollama LLM 提取实体
-
- 支持长文本自动分块处理
- """
- if not text or not text.strip():
- return []
-
- # 分割长文本
- chunks = self._split_text(text)
-
- all_entities = []
- seen_entities = set() # 用于去重
-
- for i, chunk in enumerate(chunks):
- logger.info(f"处理分块 {i+1}/{len(chunks)}: 长度={len(chunk['text'])}")
-
- # 构建 prompt
- prompt = self._build_ner_prompt(chunk["text"], entity_types)
-
- # 调用 Ollama
- response = await self._call_ollama(prompt)
-
- if not response:
- logger.warning(f"分块 {i+1} Ollama 返回为空")
- continue
-
- # 解析结果
- entities = self._parse_llm_response(response, chunk["start_pos"])
-
- # 去重
- for entity in entities:
- entity_key = f"{entity.type}:{entity.name}"
- if entity_key not in seen_entities:
- seen_entities.add(entity_key)
- all_entities.append(entity)
-
- logger.info(f"分块 {i+1} 提取实体: {len(entities)} 个")
-
- logger.info(f"Ollama NER 提取完成: 总实体数={len(all_entities)}")
- return all_entities
-
- async def check_health(self) -> bool:
- """
- 检查 Ollama 服务是否可用
- """
- try:
- async with httpx.AsyncClient(timeout=5) as client:
- response = await client.get(f"{self.base_url}/api/tags")
- return response.status_code == 200
- except Exception:
- return False
- # 创建单例
- ollama_service = OllamaService()
|