Эх сурвалжийг харах

fix: 改进 SSE 流处理和调试日志

- 使用 DataBuffer 替代 String 读取 SSE 流
- 增加详细的调试日志用于排查问题
- SSE 未获取结果时自动回退到普通 API
- 正确释放 DataBuffer 资源
何文松 1 сар өмнө
parent
commit
924d994294

+ 44 - 11
backend/graph-service/src/main/java/com/lingyue/graph/listener/DocumentParsedEventListener.java

@@ -164,19 +164,27 @@ public class DocumentParsedEventListener {
                     .build();
             
             AtomicReference<Map<String, Object>> resultRef = new AtomicReference<>();
-            
-            // 订阅 SSE 事件流,使用 DataBuffer 逐块读取
             StringBuilder buffer = new StringBuilder();
             
+            log.info("开始 SSE 流式 NER 请求: documentId={}", documentId);
+            
             webClient.post()
                     .uri("/ner/extract/stream")
                     .contentType(MediaType.APPLICATION_JSON)
                     .accept(MediaType.TEXT_EVENT_STREAM)
                     .bodyValue(request)
                     .retrieve()
-                    .bodyToFlux(String.class)
+                    .bodyToFlux(org.springframework.core.io.buffer.DataBuffer.class)
                     .timeout(Duration.ofMinutes(10))  // 10 分钟总超时
-                    .doOnNext(chunk -> {
+                    .doOnNext(dataBuffer -> {
+                        // 从 DataBuffer 读取字符串
+                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
+                        dataBuffer.read(bytes);
+                        org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer);
+                        String chunk = new String(bytes, java.nio.charset.StandardCharsets.UTF_8);
+                        
+                        log.debug("收到 SSE 数据块: length={}", chunk.length());
+                        
                         // 累积数据到缓冲区
                         buffer.append(chunk);
                         
@@ -193,18 +201,26 @@ public class DocumentParsedEventListener {
                             parseSseEvent(eventBlock, documentId, resultRef);
                         }
                     })
-                    .doOnError(e -> log.error("SSE 流处理错误: documentId={}, error={}", documentId, e.getMessage()))
+                    .doOnComplete(() -> log.info("SSE 流完成: documentId={}", documentId))
+                    .doOnError(e -> log.error("SSE 流处理错误: documentId={}, error={}", documentId, e.getMessage(), e))
                     .blockLast();  // 阻塞等待完成
             
             // 处理缓冲区中剩余的数据
             if (buffer.length() > 0) {
+                log.debug("处理剩余缓冲区数据: length={}", buffer.length());
                 parseSseEvent(buffer.toString(), documentId, resultRef);
             }
             
-            return resultRef.get();
+            Map<String, Object> result = resultRef.get();
+            if (result == null) {
+                log.warn("SSE 处理完成但未获取到结果,回退到普通 API: documentId={}", documentId);
+                return callPythonNerService(documentId, text, userId);
+            }
+            
+            return result;
             
         } catch (Exception e) {
-            log.error("调用 Python NER SSE 服务失败: {}", e.getMessage());
+            log.error("调用 Python NER SSE 服务失败: documentId={}, error={}", documentId, e.getMessage(), e);
             // 回退到普通 API
             log.info("回退到普通 NER API: documentId={}", documentId);
             return callPythonNerService(documentId, text, userId);
@@ -216,11 +232,16 @@ public class DocumentParsedEventListener {
      */
     private void parseSseEvent(String eventBlock, String documentId, AtomicReference<Map<String, Object>> resultRef) {
         try {
+            if (eventBlock == null || eventBlock.trim().isEmpty()) {
+                return;
+            }
+            
             String eventType = null;
             String eventData = null;
             
             // 解析事件块
             for (String line : eventBlock.split("\n")) {
+                line = line.trim();
                 if (line.startsWith("event:")) {
                     eventType = line.substring(6).trim();
                 } else if (line.startsWith("data:")) {
@@ -228,7 +249,10 @@ public class DocumentParsedEventListener {
                 }
             }
             
+            log.debug("解析 SSE 事件: type={}, dataLength={}", eventType, eventData != null ? eventData.length() : 0);
+            
             if (eventData == null || eventData.isEmpty()) {
+                log.debug("SSE 事件数据为空,跳过: type={}", eventType);
                 return;
             }
             
@@ -236,24 +260,33 @@ public class DocumentParsedEventListener {
             Map<String, Object> data = objectMapper.readValue(eventData, Map.class);
             
             // 根据事件类型处理
-            if ("progress".equals(eventType) || "chunk_complete".equals(eventType)) {
+            if ("start".equals(eventType)) {
+                log.info("NER 开始处理: documentId={}", documentId);
+            } else if ("progress".equals(eventType) || "chunk_complete".equals(eventType)) {
                 Object progressObj = data.get("progress_percent");
                 if (progressObj != null) {
                     int progress = progressObj instanceof Integer ? (Integer) progressObj : ((Number) progressObj).intValue();
                     String message = (String) data.getOrDefault("message", "处理中...");
                     log.info("NER 进度: documentId={}, progress={}%, message={}", documentId, progress, message);
                 }
+            } else if ("entities_data".equals(eventType)) {
+                log.debug("收到 entities_data 事件,实体数: {}", data.get("total_entities"));
             } else if ("complete".equals(eventType)) {
                 // 完成事件,包含最终结果
                 resultRef.set(data);
-                log.info("NER 流式处理完成: documentId={}, entities={}", documentId, 
-                        data.get("entities") != null ? ((List<?>) data.get("entities")).size() : 0);
+                int entityCount = data.get("entities") != null ? ((List<?>) data.get("entities")).size() : 0;
+                log.info("NER 流式处理完成: documentId={}, entities={}, success={}", 
+                        documentId, entityCount, data.get("success"));
             } else if ("error".equals(eventType)) {
                 log.error("NER 服务返回错误: documentId={}, error={}", documentId, data.get("error"));
+            } else {
+                log.debug("未知 SSE 事件类型: {}", eventType);
             }
             
         } catch (Exception e) {
-            log.debug("解析 SSE 事件时出错: {}", e.getMessage());
+            log.warn("解析 SSE 事件时出错: eventBlock={}, error={}", 
+                    eventBlock.length() > 200 ? eventBlock.substring(0, 200) + "..." : eventBlock, 
+                    e.getMessage());
         }
     }
 }