| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- """
- Ollama LLM 服务
- 用于调用本地 Ollama 模型进行 NER 提取
- """
- import json
- import re
- import uuid
- import httpx
- from typing import List, Optional, Dict, Any
- from loguru import logger
- from ..config import settings
- from ..models import EntityInfo, PositionInfo
- class OllamaService:
- """Ollama LLM 服务"""
-
- def __init__(self):
- self.base_url = settings.ollama_url
- self.model = settings.ollama_model
- self.timeout = settings.ollama_timeout
- self.chunk_size = settings.chunk_size
- self.chunk_overlap = settings.chunk_overlap
-
- # 检测是否使用 UniversalNER
- self.is_universal_ner = "universal-ner" in self.model.lower()
-
- logger.info(f"初始化 Ollama 服务: url={self.base_url}, model={self.model}, universal_ner={self.is_universal_ner}")
-
- def _split_text(self, text: str) -> List[Dict[str, Any]]:
- """
- 将长文本分割成多个块
-
- Args:
- text: 原始文本
-
- Returns:
- 分块列表,每个块包含 text, start_pos, end_pos
- """
- if len(text) <= self.chunk_size:
- return [{"text": text, "start_pos": 0, "end_pos": len(text)}]
-
- chunks = []
- start = 0
-
- while start < len(text):
- end = min(start + self.chunk_size, len(text))
-
- # 尝试在句号、换行处分割,避免截断句子
- if end < len(text):
- # 向前查找最近的分隔符
- for sep in ['\n\n', '\n', '。', ';', '!', '?', '.']:
- sep_pos = text.rfind(sep, start + self.chunk_size // 2, end)
- if sep_pos > start:
- end = sep_pos + len(sep)
- break
-
- chunk_text = text[start:end]
- chunks.append({
- "text": chunk_text,
- "start_pos": start,
- "end_pos": end
- })
-
- # 下一个块的起始位置(考虑重叠)
- start = end - self.chunk_overlap if end < len(text) else end
-
- logger.info(f"文本分割完成: 总长度={len(text)}, 分块数={len(chunks)}")
- return chunks
-
- def _build_ner_prompt(self, text: str, entity_types: Optional[List[str]] = None) -> str:
- """
- 构建 NER 提取的 Prompt
- """
- types = entity_types or settings.entity_types
- types_desc = ", ".join(types)
-
- # 示例帮助模型理解格式
- example = '{"entities": [{"name": "成都市", "type": "LOC", "charStart": 10, "charEnd": 13}, {"name": "2024年5月", "type": "DATE", "charStart": 0, "charEnd": 7}]}'
-
- # 简洁直接的 prompt,要求模型只输出 JSON
- prompt = f"""请从以下文本中提取命名实体,直接输出JSON,不要解释。
- 实体类型: {types_desc}
- 输出格式示例:
- {example}
- 文本:
- {text}
- 请直接输出JSON:"""
- return prompt
-
- async def _call_ollama(self, prompt: str, disable_thinking: bool = True) -> Optional[str]:
- """
- 调用 Ollama Chat API
-
- Args:
- prompt: 输入提示词
- disable_thinking: 是否禁用思考模式(适用于 Qwen3 等支持思考的模型)
- """
- # 使用 /api/chat 接口,think 参数仅在此接口生效
- url = f"{self.base_url}/api/chat"
- payload = {
- "model": self.model,
- "messages": [
- {
- "role": "user",
- "content": prompt
- }
- ],
- "stream": False,
- "options": {
- "temperature": 0.1, # 低温度,更确定性的输出
- }
- }
-
- # Qwen3 思考模式:禁用思考,直接输出 JSON 结果
- # think 参数仅在 /api/chat 接口中生效
- if disable_thinking:
- payload["think"] = False
-
- try:
- async with httpx.AsyncClient(timeout=self.timeout) as client:
- response = await client.post(url, json=payload)
- response.raise_for_status()
- result = response.json()
- # chat 接口返回格式: {"message": {"role": "assistant", "content": "..."}}
- message = result.get("message", {})
- return message.get("content", "")
- except httpx.TimeoutException:
- logger.error(f"Ollama 请求超时: timeout={self.timeout}s")
- return None
- except Exception as e:
- logger.error(f"Ollama 请求失败: {e}")
- return None
-
- def _parse_llm_response(self, response: str, chunk_start_pos: int = 0) -> List[EntityInfo]:
- """
- 解析 LLM 返回的 JSON 结果
-
- Args:
- response: LLM 返回的文本
- chunk_start_pos: 当前分块在原文中的起始位置(用于位置校正)
- """
- entities = []
-
- try:
- # Qwen3 思考模式处理:提取 </think> 之后的内容
- think_end = response.find('</think>')
- if think_end != -1:
- # 只保留思考结束后的内容
- response = response[think_end + len('</think>'):]
- logger.debug(f"提取思考后内容: {response[:200]}...")
- else:
- # 检查是否存在 <think> 但没有 </think>(思考未完成或被截断)
- think_start = response.find('<think>')
- if think_start != -1:
- # 尝试从 <think> 之前的内容或整个响应中查找 JSON
- # 有些情况下 JSON 可能在思考标签之前
- pre_think = response[:think_start].strip()
- if pre_think:
- response = pre_think
- logger.debug(f"使用思考前内容: {response[:200]}...")
- else:
- # 思考内容中可能包含 JSON,尝试直接从响应中提取
- logger.debug("检测到不完整的思考模式,尝试直接提取JSON")
-
- # 移除 markdown code block 标记
- response = re.sub(r'```json\s*', '', response)
- response = re.sub(r'```\s*', '', response)
- response = response.strip()
-
- # 方法1:直接尝试解析整个响应(如果是纯 JSON)
- data = None
- try:
- data = json.loads(response)
- except json.JSONDecodeError:
- pass
-
- # 方法2:查找包含 entities 的 JSON 对象(使用更宽松的匹配)
- if not data or "entities" not in data:
- # 匹配 {"entities": [...]} 格式,使用贪婪匹配以捕获完整的嵌套结构
- # 先尝试找到所有可能的 JSON 对象
- json_matches = re.findall(r'\{[^{}]*"entities"\s*:\s*\[[^\]]*\][^{}]*\}', response)
- for json_str in json_matches:
- try:
- data = json.loads(json_str)
- if "entities" in data:
- break
- except json.JSONDecodeError:
- continue
-
- # 方法3:尝试更宽松的正则匹配(处理多行和嵌套)
- if not data or "entities" not in data:
- # 匹配从 {"entities" 开始到最后一个 ]} 的内容
- json_match = re.search(r'\{\s*"entities"\s*:\s*\[[\s\S]*\]\s*\}', response)
- if json_match:
- try:
- data = json.loads(json_match.group())
- except json.JSONDecodeError:
- pass
-
- if not data or "entities" not in data:
- logger.warning(f"未找到有效的 entities JSON, response={response[:300]}...")
- return entities
-
- entity_list = data.get("entities", [])
-
- for item in entity_list:
- name = item.get("name", "").strip()
- entity_type = item.get("type", "").upper()
- char_start = item.get("charStart", 0)
- char_end = item.get("charEnd", 0)
-
- if not name or len(name) < 2:
- continue
-
- # 校正位置(加上分块的起始位置)
- adjusted_start = char_start + chunk_start_pos
- adjusted_end = char_end + chunk_start_pos
-
- entity = EntityInfo(
- name=name,
- type=entity_type,
- value=name,
- position=PositionInfo(
- char_start=adjusted_start,
- char_end=adjusted_end,
- line=1 # LLM 模式不计算行号
- ),
- confidence=0.9, # LLM 模式默认较高置信度
- temp_id=str(uuid.uuid4())[:8]
- )
- entities.append(entity)
-
- except json.JSONDecodeError as e:
- logger.warning(f"JSON 解析失败: {e}, response={response[:200]}...")
- except Exception as e:
- logger.error(f"解析 LLM 响应失败: {e}")
-
- return entities
-
- async def extract_entities(
- self,
- text: str,
- entity_types: Optional[List[str]] = None
- ) -> List[EntityInfo]:
- """
- 使用 Ollama LLM 提取实体
-
- 支持长文本自动分块处理
- 自动检测是否使用 UniversalNER 并切换提取策略
- """
- if not text or not text.strip():
- return []
-
- # 根据模型类型选择提取策略
- if self.is_universal_ner:
- return await self._extract_with_universal_ner(text, entity_types)
- else:
- return await self._extract_with_general_llm(text, entity_types)
-
- async def _extract_with_general_llm(
- self,
- text: str,
- entity_types: Optional[List[str]] = None
- ) -> List[EntityInfo]:
- """
- 使用通用 LLM(如 Qwen)提取实体
- """
- # 分割长文本
- chunks = self._split_text(text)
-
- all_entities = []
- seen_entities = set() # 用于去重
-
- for i, chunk in enumerate(chunks):
- logger.info(f"处理分块 {i+1}/{len(chunks)}: 长度={len(chunk['text'])}")
-
- # 构建 prompt
- prompt = self._build_ner_prompt(chunk["text"], entity_types)
-
- # 调用 Ollama
- response = await self._call_ollama(prompt)
-
- if not response:
- logger.warning(f"分块 {i+1} Ollama 返回为空")
- continue
-
- # 打印完整响应用于调试
- logger.debug(f"分块 {i+1} LLM 完整响应:\n{response}\n{'='*50}")
-
- # 解析结果
- entities = self._parse_llm_response(response, chunk["start_pos"])
-
- # 去重
- for entity in entities:
- entity_key = f"{entity.type}:{entity.name}"
- if entity_key not in seen_entities:
- seen_entities.add(entity_key)
- all_entities.append(entity)
-
- logger.info(f"分块 {i+1} 提取实体: {len(entities)} 个")
-
- logger.info(f"通用 LLM NER 提取完成: 总实体数={len(all_entities)}")
- return all_entities
-
- async def _extract_with_universal_ner(
- self,
- text: str,
- entity_types: Optional[List[str]] = None
- ) -> List[EntityInfo]:
- """
- 使用 UniversalNER 模型提取实体
-
- UniversalNER 的 Prompt 格式: "文本内容. 实体类型英文名"
- 返回格式: ["实体1", "实体2", ...]
- """
- # 实体类型映射(中文类型 -> UniversalNER 英文类型)
- type_mapping = {
- "PERSON": ["person", "people", "human"],
- "ORG": ["organization", "company", "institution"],
- "LOC": ["location", "place", "address"],
- "DATE": ["date", "time"],
- "NUMBER": ["number", "quantity", "measurement"],
- "DEVICE": ["device", "equipment", "instrument"],
- "PROJECT": ["project", "program"],
- "METHOD": ["method", "standard", "specification"],
- }
-
- types_to_extract = entity_types or list(type_mapping.keys())
-
- # 分割长文本
- chunks = self._split_text(text)
-
- all_entities = []
- seen_entities = set() # 用于去重
-
- for i, chunk in enumerate(chunks):
- chunk_text = chunk["text"]
- chunk_start = chunk["start_pos"]
-
- logger.info(f"UniversalNER 处理分块 {i+1}/{len(chunks)}: 长度={len(chunk_text)}")
-
- # 对每种实体类型分别提取
- for entity_type in types_to_extract:
- if entity_type not in type_mapping:
- continue
-
- # 使用第一个英文类型名
- english_type = type_mapping[entity_type][0]
-
- # UniversalNER 的 Prompt 格式
- prompt = f"{chunk_text} {english_type}"
-
- # 调用 Ollama
- response = await self._call_ollama(prompt)
-
- if not response:
- continue
-
- # 解析 UniversalNER 响应(返回格式如: ["实体1", "实体2"])
- entities = self._parse_universal_ner_response(
- response, entity_type, chunk_text, chunk_start
- )
-
- # 去重
- for entity in entities:
- entity_key = f"{entity.type}:{entity.name}"
- if entity_key not in seen_entities:
- seen_entities.add(entity_key)
- all_entities.append(entity)
-
- logger.info(f"分块 {i+1} UniversalNER 提取实体: {len([e for e in all_entities if e not in seen_entities])} 个")
-
- logger.info(f"UniversalNER 提取完成: 总实体数={len(all_entities)}")
- return all_entities
-
- def _parse_universal_ner_response(
- self,
- response: str,
- entity_type: str,
- original_text: str,
- chunk_start_pos: int = 0
- ) -> List[EntityInfo]:
- """
- 解析 UniversalNER 的响应
-
- UniversalNER 返回格式: ["实体1", "实体2", ...]
- """
- entities = []
-
- try:
- # 清理响应,提取 JSON 数组
- response = response.strip()
-
- # 尝试找到 JSON 数组
- json_match = re.search(r'\[[\s\S]*?\]', response)
- if not json_match:
- logger.debug(f"UniversalNER 响应中未找到数组: {response[:100]}")
- return entities
-
- json_str = json_match.group()
- entity_names = json.loads(json_str)
-
- if not isinstance(entity_names, list):
- return entities
-
- for name in entity_names:
- if not isinstance(name, str) or len(name) < 2:
- continue
-
- name = name.strip()
-
- # 在原文中查找位置
- pos = original_text.find(name)
- char_start = pos + chunk_start_pos if pos >= 0 else 0
- char_end = char_start + len(name) if pos >= 0 else 0
-
- entity = EntityInfo(
- name=name,
- type=entity_type,
- value=name,
- position=PositionInfo(
- char_start=char_start,
- char_end=char_end,
- line=1
- ),
- confidence=0.85, # UniversalNER 置信度
- temp_id=str(uuid.uuid4())[:8]
- )
- entities.append(entity)
-
- except json.JSONDecodeError as e:
- logger.debug(f"UniversalNER JSON 解析失败: {e}, response={response[:100]}")
- except Exception as e:
- logger.error(f"解析 UniversalNER 响应失败: {e}")
-
- return entities
-
- async def check_health(self) -> bool:
- """
- 检查 Ollama 服务是否可用
- """
- try:
- async with httpx.AsyncClient(timeout=5) as client:
- response = await client.get(f"{self.base_url}/api/tags")
- return response.status_code == 200
- except Exception:
- return False
- # 创建单例
- ollama_service = OllamaService()
|