Bladeren bron

fix: 修复 async generator 中 return 值的错误

- 移除 deepseek_service 中的 return all_entities(async generator 不支持)
- 改为使用 entities_data 事件传递实体列表
- 修复 ner.py 中对实体数据事件的解析逻辑
何文松 1 maand geleden
bovenliggende
commit
05d3a4ad4e

+ 20 - 20
python-services/ner-service/app/routers/ner.py

@@ -116,33 +116,33 @@ async def extract_entities_stream(request: NerRequest):
             all_entities = []
             all_relations = []
             
-            # 创建进度回调
-            async def progress_callback(chunk_index: int, total_chunks: int, chunk_entities: list):
-                nonlocal all_entities
-                all_entities.extend(chunk_entities)
-                
-                progress_data = {
-                    "document_id": request.document_id,
-                    "chunk_index": chunk_index,
-                    "total_chunks": total_chunks,
-                    "chunk_entities": len(chunk_entities),
-                    "total_entities": len(all_entities),
-                    "progress_percent": int((chunk_index / total_chunks) * 100)
-                }
-                return await sse_event("progress", progress_data)
-            
             # 调用带进度的 NER 服务
             from ..services.ner_service import ner_service
             
             # 检查是否支持流式提取
             if hasattr(ner_service, 'extract_entities_with_progress'):
-                async for event in ner_service.extract_entities_with_progress(
+                async for event_str in ner_service.extract_entities_with_progress(
                     text=request.text,
-                    entity_types=request.entity_types,
-                    progress_callback=progress_callback
+                    entity_types=request.entity_types
                 ):
-                    yield event
-                    all_entities = event.get('entities', all_entities) if isinstance(event, dict) else all_entities
+                    # 转发进度事件
+                    yield event_str
+                    
+                    # 解析事件获取实体数据
+                    if "entities_data" in event_str:
+                        try:
+                            # 从 SSE 格式中提取 JSON 数据
+                            lines = event_str.strip().split('\n')
+                            for line in lines:
+                                if line.startswith('data:'):
+                                    data = json.loads(line[5:].strip())
+                                    if 'entities' in data:
+                                        # 将字典转换回 EntityInfo 对象
+                                        all_entities = [
+                                            EntityInfo(**e) for e in data['entities']
+                                        ]
+                        except Exception as parse_err:
+                            logger.warning(f"解析实体数据事件失败: {parse_err}")
             else:
                 # 回退到普通提取
                 all_entities = await ner_service.extract_entities(

+ 5 - 3
python-services/ner-service/app/services/deepseek_service.py

@@ -318,9 +318,11 @@ class DeepSeekService:
         
         logger.info(f"DeepSeek NER 提取完成: 总实体数={len(all_entities)}")
         
-        # 最终不在这里发送 complete 事件,由调用方处理
-        # 返回最终实体列表供调用方使用
-        return all_entities
+        # 发送实体数据事件(供调用方获取实体列表)
+        yield await sse_event("entities_data", {
+            "entities": [entity.dict() for entity in all_entities],
+            "total_entities": len(all_entities)
+        })
     
     async def check_health(self) -> bool:
         """

+ 6 - 3
python-services/ner-service/app/services/ner_service.py

@@ -60,8 +60,7 @@ class NerService:
     async def extract_entities_with_progress(
         self, 
         text: str, 
-        entity_types: Optional[List[str]] = None,
-        progress_callback=None
+        entity_types: Optional[List[str]] = None
     ):
         """
         从文本中提取实体(带进度生成器,用于 SSE 流式响应)
@@ -69,7 +68,10 @@ class NerService:
         Yields:
             SSE 事件字符串
         """
+        import json
+        
         if not text or not text.strip():
+            yield f"event: entities_data\ndata: {json.dumps({'entities': [], 'total_entities': 0}, ensure_ascii=False)}\n\n"
             return
         
         if self.model_type == "deepseek":
@@ -79,8 +81,9 @@ class NerService:
         else:
             # 其他模型回退到普通提取,一次性返回
             entities = await self.extract_entities(text, entity_types)
-            import json
             yield f"event: chunk_complete\ndata: {json.dumps({'total_entities': len(entities), 'progress_percent': 100}, ensure_ascii=False)}\n\n"
+            # 发送实体数据事件
+            yield f"event: entities_data\ndata: {json.dumps({'entities': [e.dict() for e in entities], 'total_entities': len(entities)}, ensure_ascii=False)}\n\n"
     
     async def _extract_by_rules(
         self,