Bläddra i källkod

feat: 实现多阶段任务进度跟踪

扩展 ParseTask 实体,支持完整的文档处理流程进度跟踪:

1. 数据库迁移
   - 添加各阶段状态字段:parse/rag/structured/ner/graph
   - 添加阶段结果字段:element_count, image_count, entity_count 等

2. 任务进度服务 (TaskProgressService)
   - 各阶段独立进度更新
   - 自动计算总体进度(基于阶段权重)
   - 支持失败标记和错误消息

3. 任务中心优化 (ParseTaskCenterService)
   - 构建完整的6阶段进度列表
   - 展示阶段结果摘要(如:240个实体, 113个关系)

4. 进度更新 API (TaskProgressController)
   - POST /api/internal/task-progress/{stage}/{documentId}
   - 供 DocumentParsedEventListener 调用

5. 事件监听器更新
   - 自动更新结构化解析进度
   - 自动更新 NER 提取进度
   - 自动更新图构建进度

阶段权重配置:
- parse: 15%, rag: 10%, structured: 15%, ner: 50%, graph: 10%
何文松 1 månad sedan
förälder
incheckning
82c53c666e

+ 82 - 4
backend/graph-service/src/main/java/com/lingyue/graph/listener/DocumentParsedEventListener.java

@@ -108,12 +108,17 @@ public class DocumentParsedEventListener {
             String docType = document.getType();
             if (!"word".equalsIgnoreCase(docType)) {
                 log.debug("非 Word 文档,跳过结构化解析: documentId={}, type={}", documentId, docType);
+                // 标记为完成(非Word文档无需结构化解析)
+                updateTaskProgress(documentId, "structured", "completed", 100, null);
                 return;
             }
             
             log.info("开始自动结构化解析: documentId={}", documentId);
             long startTime = System.currentTimeMillis();
             
+            // 更新进度:开始
+            updateTaskProgress(documentId, "structured", "processing", 10, null);
+            
             // 调用本地 API 触发结构化解析
             String url = "http://localhost:" + serverPort + "/parse/structured/" + documentId;
             
@@ -122,12 +127,26 @@ public class DocumentParsedEventListener {
             if (response.getStatusCode().is2xxSuccessful()) {
                 long time = System.currentTimeMillis() - startTime;
                 log.info("结构化解析完成: documentId={}, time={}ms", documentId, time);
+                
+                // 提取结果信息并更新进度
+                Map<String, Object> data = (Map<String, Object>) response.getBody().get("data");
+                if (data != null) {
+                    Map<String, Object> progressData = new HashMap<>();
+                    progressData.put("status", "completed");
+                    progressData.put("progress", 100);
+                    progressData.put("elementCount", data.get("totalElements"));
+                    progressData.put("imageCount", data.get("imageCount"));
+                    progressData.put("tableCount", data.get("tableCount"));
+                    updateTaskProgress(documentId, "structured", progressData);
+                }
             } else {
                 log.warn("结构化解析失败: documentId={}, status={}", documentId, response.getStatusCode());
+                updateTaskProgress(documentId, "structured", "failed", 0, null);
             }
             
         } catch (Exception e) {
             log.error("自动结构化解析异常: documentId={}, error={}", documentId, e.getMessage());
+            updateTaskProgress(documentId, "structured", "failed", 0, null);
             // 异常不向上抛出,不影响后续处理
         }
     }
@@ -152,6 +171,9 @@ public class DocumentParsedEventListener {
             }
             
             log.info("开始自动 NER 提取: documentId={}", documentId);
+            
+            // 更新进度:开始
+            updateTaskProgress(documentId, "ner", "processing", 5, null);
 
             // 2. 调用 Python NER 服务(根据配置选择异步轮询或同步 API)
             Map<String, Object> nerResponse;
@@ -164,13 +186,23 @@ public class DocumentParsedEventListener {
             if (nerResponse == null || !Boolean.TRUE.equals(nerResponse.get("success"))) {
                 log.warn("NER 服务调用失败: documentId={}, error={}", 
                         documentId, nerResponse != null ? nerResponse.get("errorMessage") : "null response");
+                updateTaskProgress(documentId, "ner", "failed", 0, null);
                 return;
             }
+            
+            // 更新进度:NER 完成,开始保存
+            updateTaskProgress(documentId, "ner", "processing", 80, null);
+            
+            // 更新图构建进度:开始
+            updateTaskProgress(documentId, "graph", "processing", 10, null);
 
             // 3. 保存实体到图数据库
             @SuppressWarnings("unchecked")
             List<Map<String, Object>> entities = (List<Map<String, Object>>) nerResponse.get("entities");
             Map<String, String> tempIdToNodeId = graphNerService.saveEntitiesToGraph(documentId, userId, entities);
+            
+            // 更新图构建进度
+            updateTaskProgress(documentId, "graph", "processing", 50, null);
 
             // 4. 保存关系到图数据库
             @SuppressWarnings("unchecked")
@@ -184,14 +216,24 @@ public class DocumentParsedEventListener {
 
             long processingTime = System.currentTimeMillis() - startTime;
             
+            // 更新 NER 完成进度
+            int entityCount = entities != null ? entities.size() : 0;
+            Map<String, Object> nerProgressData = new HashMap<>();
+            nerProgressData.put("status", "completed");
+            nerProgressData.put("progress", 100);
+            nerProgressData.put("entityCount", entityCount);
+            nerProgressData.put("relationCount", relationCount);
+            updateTaskProgress(documentId, "ner", nerProgressData);
+            
+            // 更新图构建完成进度
+            updateTaskProgress(documentId, "graph", "completed", 100, null);
+            
             log.info("NER 自动提取完成: documentId={}, entityCount={}, relationCount={}, time={}ms",
-                    documentId,
-                    entities != null ? entities.size() : 0,
-                    relationCount,
-                    processingTime);
+                    documentId, entityCount, relationCount, processingTime);
 
         } catch (Exception e) {
             log.error("NER 自动提取异常: documentId={}", documentId, e);
+            updateTaskProgress(documentId, "ner", "failed", 0, null);
             // 异常不向上抛出,不影响其他处理
         }
     }
@@ -341,4 +383,40 @@ public class DocumentParsedEventListener {
             return callPythonNerService(documentId, text, userId);
         }
     }
+    
+    // ==================== 任务进度更新 ====================
+    
+    /**
+     * 更新任务进度(简单版本)
+     */
+    private void updateTaskProgress(String documentId, String stage, String status, Integer progress, String errorMessage) {
+        Map<String, Object> data = new HashMap<>();
+        data.put("status", status);
+        data.put("progress", progress);
+        if (errorMessage != null) {
+            data.put("errorMessage", errorMessage);
+        }
+        updateTaskProgress(documentId, stage, data);
+    }
+    
+    /**
+     * 更新任务进度(完整版本)
+     */
+    private void updateTaskProgress(String documentId, String stage, Map<String, Object> data) {
+        try {
+            String url = "http://localhost:" + serverPort + "/api/internal/task-progress/" + stage + "/" + documentId;
+            
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_JSON);
+            
+            HttpEntity<Map<String, Object>> entity = new HttpEntity<>(data, headers);
+            
+            restTemplate.postForEntity(url, entity, Map.class);
+            
+        } catch (Exception e) {
+            // 进度更新失败不影响主流程
+            log.debug("更新任务进度失败(可忽略): documentId={}, stage={}, error={}", 
+                    documentId, stage, e.getMessage());
+        }
+    }
 }

+ 105 - 0
backend/parse-service/src/main/java/com/lingyue/parse/controller/TaskProgressController.java

@@ -0,0 +1,105 @@
+package com.lingyue.parse.controller;
+
+import com.lingyue.common.domain.AjaxResult;
+import com.lingyue.parse.service.TaskProgressService;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Map;
+
+/**
+ * 任务进度更新控制器
+ * 内部使用,供其他服务更新任务进度
+ * 
+ * @author lingyue
+ * @since 2026-01-22
+ */
+@Slf4j
+@RestController
+@RequestMapping("/api/internal/task-progress")
+@RequiredArgsConstructor
+@Tag(name = "任务进度(内部)", description = "内部任务进度更新接口")
+public class TaskProgressController {
+
+    private final TaskProgressService taskProgressService;
+
+    /**
+     * 更新结构化解析进度
+     */
+    @PostMapping("/structured/{documentId}")
+    @Operation(summary = "更新结构化解析进度")
+    public AjaxResult<?> updateStructuredProgress(
+            @PathVariable @Parameter(description = "文档ID") String documentId,
+            @RequestBody Map<String, Object> request) {
+        
+        String status = (String) request.get("status");
+        Integer progress = (Integer) request.get("progress");
+        Integer elementCount = (Integer) request.get("elementCount");
+        Integer imageCount = (Integer) request.get("imageCount");
+        Integer tableCount = (Integer) request.get("tableCount");
+        
+        taskProgressService.updateStructuredProgress(documentId, status, progress, 
+                elementCount, imageCount, tableCount);
+        
+        return AjaxResult.success();
+    }
+
+    /**
+     * 更新 NER 进度
+     */
+    @PostMapping("/ner/{documentId}")
+    @Operation(summary = "更新NER进度")
+    public AjaxResult<?> updateNerProgress(
+            @PathVariable @Parameter(description = "文档ID") String documentId,
+            @RequestBody Map<String, Object> request) {
+        
+        String status = (String) request.get("status");
+        Integer progress = (Integer) request.get("progress");
+        String nerTaskId = (String) request.get("nerTaskId");
+        Integer entityCount = (Integer) request.get("entityCount");
+        Integer relationCount = (Integer) request.get("relationCount");
+        
+        taskProgressService.updateNerProgress(documentId, status, progress, 
+                nerTaskId, entityCount, relationCount);
+        
+        return AjaxResult.success();
+    }
+
+    /**
+     * 更新图构建进度
+     */
+    @PostMapping("/graph/{documentId}")
+    @Operation(summary = "更新图构建进度")
+    public AjaxResult<?> updateGraphProgress(
+            @PathVariable @Parameter(description = "文档ID") String documentId,
+            @RequestBody Map<String, Object> request) {
+        
+        String status = (String) request.get("status");
+        Integer progress = (Integer) request.get("progress");
+        
+        taskProgressService.updateGraphProgress(documentId, status, progress);
+        
+        return AjaxResult.success();
+    }
+
+    /**
+     * 标记任务失败
+     */
+    @PostMapping("/fail/{documentId}")
+    @Operation(summary = "标记任务失败")
+    public AjaxResult<?> markFailed(
+            @PathVariable @Parameter(description = "文档ID") String documentId,
+            @RequestBody Map<String, Object> request) {
+        
+        String stage = (String) request.get("stage");
+        String errorMessage = (String) request.get("errorMessage");
+        
+        taskProgressService.markFailed(documentId, stage, errorMessage);
+        
+        return AjaxResult.success();
+    }
+}

+ 79 - 5
backend/parse-service/src/main/java/com/lingyue/parse/entity/ParseTask.java

@@ -11,6 +11,14 @@ import java.util.Date;
 
 /**
  * 解析任务实体
+ * 
+ * 阶段流程:
+ * 1. upload - 文件上传
+ * 2. parse - 文本解析(OCR/Word提取)
+ * 3. rag - RAG向量化
+ * 4. structured - 结构化解析(图片/表格)
+ * 5. ner - NER实体提取
+ * 6. graph - 图数据库构建
  */
 @EqualsAndHashCode(callSuper = true)
 @Data
@@ -22,15 +30,15 @@ public class ParseTask extends SimpleModel {
     @TableField("document_id")
     private String documentId;
     
-    @Schema(description = "状态")
+    @Schema(description = "状态: pending/processing/completed/failed")
     @TableField("status")
-    private String status = "pending"; // pending/processing/completed/failed
+    private String status = "pending";
     
-    @Schema(description = "进度")
+    @Schema(description = "整体进度(0-100)")
     @TableField("progress")
-    private Integer progress = 0; // 0-100
+    private Integer progress = 0;
     
-    @Schema(description = "当前步骤")
+    @Schema(description = "当前步骤: upload/parse/rag/structured/ner/graph")
     @TableField("current_step")
     private String currentStep;
     
@@ -49,4 +57,70 @@ public class ParseTask extends SimpleModel {
     @Schema(description = "完成时间")
     @TableField("completed_at")
     private Date completedAt;
+    
+    // ==================== 各阶段状态 ====================
+    
+    @Schema(description = "解析阶段状态: pending/processing/completed/failed")
+    @TableField("parse_status")
+    private String parseStatus = "pending";
+    
+    @Schema(description = "解析阶段进度(0-100)")
+    @TableField("parse_progress")
+    private Integer parseProgress = 0;
+    
+    @Schema(description = "RAG阶段状态")
+    @TableField("rag_status")
+    private String ragStatus = "pending";
+    
+    @Schema(description = "RAG阶段进度")
+    @TableField("rag_progress")
+    private Integer ragProgress = 0;
+    
+    @Schema(description = "结构化解析状态")
+    @TableField("structured_status")
+    private String structuredStatus = "pending";
+    
+    @Schema(description = "结构化解析进度")
+    @TableField("structured_progress")
+    private Integer structuredProgress = 0;
+    
+    @Schema(description = "结构化解析结果: 元素数量")
+    @TableField("structured_element_count")
+    private Integer structuredElementCount;
+    
+    @Schema(description = "结构化解析结果: 图片数量")
+    @TableField("structured_image_count")
+    private Integer structuredImageCount;
+    
+    @Schema(description = "结构化解析结果: 表格数量")
+    @TableField("structured_table_count")
+    private Integer structuredTableCount;
+    
+    @Schema(description = "NER阶段状态")
+    @TableField("ner_status")
+    private String nerStatus = "pending";
+    
+    @Schema(description = "NER阶段进度")
+    @TableField("ner_progress")
+    private Integer nerProgress = 0;
+    
+    @Schema(description = "NER任务ID(Python服务返回)")
+    @TableField("ner_task_id")
+    private String nerTaskId;
+    
+    @Schema(description = "NER结果: 实体数量")
+    @TableField("ner_entity_count")
+    private Integer nerEntityCount;
+    
+    @Schema(description = "NER结果: 关系数量")
+    @TableField("ner_relation_count")
+    private Integer nerRelationCount;
+    
+    @Schema(description = "图构建阶段状态")
+    @TableField("graph_status")
+    private String graphStatus = "pending";
+    
+    @Schema(description = "图构建阶段进度")
+    @TableField("graph_progress")
+    private Integer graphProgress = 0;
 }

+ 86 - 33
backend/parse-service/src/main/java/com/lingyue/parse/service/ParseTaskCenterService.java

@@ -135,11 +135,12 @@ public class ParseTaskCenterService {
     }
 
     /**
-     * 简单阶段构建:上传、OCR+TXT、NER、图构建
+     * 构建阶段列表:上传、解析、RAG、结构化、NER、图构建
      */
     private List<ParseTaskStageVO> buildStages(ParseTask task) {
         List<ParseTaskStageVO> stages = new ArrayList<>();
 
+        // 1. 文件上传阶段(总是完成)
         ParseTaskStageVO upload = new ParseTaskStageVO();
         upload.setStageName("upload");
         upload.setDisplayName("文件上传");
@@ -149,50 +150,102 @@ public class ParseTaskCenterService {
         upload.setEndedAt(task.getCreateTime());
         stages.add(upload);
 
-        ParseTaskStageVO ocr = new ParseTaskStageVO();
-        ocr.setStageName("ocr");
-        ocr.setDisplayName("OCR解析 & 文本存储");
-        ocr.setStartedAt(task.getStartedAt());
-
-        String status = task.getStatus();
-        if ("pending".equals(status)) {
-            ocr.setStatus("pending");
-            ocr.setProgress(0);
-        } else if ("processing".equals(status)) {
-            ocr.setStatus("in_progress");
-            ocr.setProgress(task.getProgress() != null ? task.getProgress() : 0);
-        } else if ("completed".equals(status)) {
-            ocr.setStatus("completed");
-            ocr.setProgress(100);
-            ocr.setEndedAt(task.getCompletedAt());
-        } else if ("failed".equals(status)) {
-            ocr.setStatus("failed");
-            ocr.setProgress(task.getProgress() != null ? task.getProgress() : 0);
-            ocr.setEndedAt(task.getCompletedAt());
-            ocr.setErrorMessage(task.getErrorMessage());
-        } else {
-            ocr.setStatus("pending");
-            ocr.setProgress(0);
+        // 2. 文本解析阶段
+        ParseTaskStageVO parse = new ParseTaskStageVO();
+        parse.setStageName("parse");
+        parse.setDisplayName("文本解析");
+        parse.setStartedAt(task.getStartedAt());
+        setStageStatus(parse, task.getParseStatus(), task.getParseProgress());
+        stages.add(parse);
+
+        // 3. RAG 向量化阶段
+        ParseTaskStageVO rag = new ParseTaskStageVO();
+        rag.setStageName("rag");
+        rag.setDisplayName("RAG向量化");
+        setStageStatus(rag, task.getRagStatus(), task.getRagProgress());
+        stages.add(rag);
+
+        // 4. 结构化解析阶段
+        ParseTaskStageVO structured = new ParseTaskStageVO();
+        structured.setStageName("structured");
+        structured.setDisplayName("结构化解析");
+        setStageStatus(structured, task.getStructuredStatus(), task.getStructuredProgress());
+        // 添加结果信息
+        if ("completed".equals(task.getStructuredStatus())) {
+            StringBuilder result = new StringBuilder();
+            if (task.getStructuredElementCount() != null) {
+                result.append(task.getStructuredElementCount()).append("个元素");
+            }
+            if (task.getStructuredImageCount() != null && task.getStructuredImageCount() > 0) {
+                result.append(", ").append(task.getStructuredImageCount()).append("张图片");
+            }
+            if (task.getStructuredTableCount() != null && task.getStructuredTableCount() > 0) {
+                result.append(", ").append(task.getStructuredTableCount()).append("个表格");
+            }
+            structured.setResultSummary(result.toString());
         }
+        stages.add(structured);
 
-        stages.add(ocr);
-
+        // 5. NER 实体提取阶段
         ParseTaskStageVO ner = new ParseTaskStageVO();
         ner.setStageName("ner");
-        ner.setDisplayName("实体提取(NER)");
-        ner.setStatus("pending");
-        ner.setProgress(0);
+        ner.setDisplayName("NER实体提取");
+        setStageStatus(ner, task.getNerStatus(), task.getNerProgress());
+        if (task.getNerTaskId() != null) {
+            ner.setTaskId(task.getNerTaskId());
+        }
+        // 添加结果信息
+        if ("completed".equals(task.getNerStatus())) {
+            StringBuilder result = new StringBuilder();
+            if (task.getNerEntityCount() != null) {
+                result.append(task.getNerEntityCount()).append("个实体");
+            }
+            if (task.getNerRelationCount() != null) {
+                result.append(", ").append(task.getNerRelationCount()).append("个关系");
+            }
+            ner.setResultSummary(result.toString());
+        }
         stages.add(ner);
 
+        // 6. 图构建阶段
         ParseTaskStageVO graph = new ParseTaskStageVO();
         graph.setStageName("graph");
-        graph.setDisplayName("图构建");
-        graph.setStatus("pending");
-        graph.setProgress(0);
+        graph.setDisplayName("图数据库构建");
+        setStageStatus(graph, task.getGraphStatus(), task.getGraphProgress());
         stages.add(graph);
 
         return stages;
     }
+    
+    /**
+     * 设置阶段状态
+     */
+    private void setStageStatus(ParseTaskStageVO stage, String status, Integer progress) {
+        if (status == null) {
+            status = "pending";
+        }
+        if (progress == null) {
+            progress = 0;
+        }
+        
+        switch (status) {
+            case "processing":
+                stage.setStatus("in_progress");
+                stage.setProgress(progress);
+                break;
+            case "completed":
+                stage.setStatus("completed");
+                stage.setProgress(100);
+                break;
+            case "failed":
+                stage.setStatus("failed");
+                stage.setProgress(progress);
+                break;
+            default:
+                stage.setStatus("pending");
+                stage.setProgress(0);
+        }
+    }
 }
 
 

+ 254 - 0
backend/parse-service/src/main/java/com/lingyue/parse/service/TaskProgressService.java

@@ -0,0 +1,254 @@
+package com.lingyue.parse.service;
+
+import com.lingyue.parse.entity.ParseTask;
+import com.lingyue.parse.repository.ParseTaskRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+
+/**
+ * 任务进度更新服务
+ * 供各处理阶段调用,统一更新任务状态
+ * 
+ * @author lingyue
+ * @since 2026-01-22
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class TaskProgressService {
+
+    private final ParseTaskRepository parseTaskRepository;
+    
+    // 阶段权重(用于计算总进度)
+    private static final int WEIGHT_PARSE = 15;
+    private static final int WEIGHT_RAG = 10;
+    private static final int WEIGHT_STRUCTURED = 15;
+    private static final int WEIGHT_NER = 50;
+    private static final int WEIGHT_GRAPH = 10;
+    private static final int TOTAL_WEIGHT = WEIGHT_PARSE + WEIGHT_RAG + WEIGHT_STRUCTURED + WEIGHT_NER + WEIGHT_GRAPH;
+
+    /**
+     * 更新解析阶段进度
+     */
+    @Transactional
+    public void updateParseProgress(String documentId, String status, Integer progress) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setParseStatus(status);
+        task.setParseProgress(progress);
+        task.setCurrentStep("parse");
+        updateOverallProgress(task);
+        
+        parseTaskRepository.updateById(task);
+        log.debug("更新解析进度: documentId={}, status={}, progress={}", documentId, status, progress);
+    }
+
+    /**
+     * 更新 RAG 阶段进度
+     */
+    @Transactional
+    public void updateRagProgress(String documentId, String status, Integer progress) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setRagStatus(status);
+        task.setRagProgress(progress);
+        task.setCurrentStep("rag");
+        updateOverallProgress(task);
+        
+        parseTaskRepository.updateById(task);
+        log.debug("更新RAG进度: documentId={}, status={}, progress={}", documentId, status, progress);
+    }
+
+    /**
+     * 更新结构化解析阶段进度
+     */
+    @Transactional
+    public void updateStructuredProgress(String documentId, String status, Integer progress,
+                                         Integer elementCount, Integer imageCount, Integer tableCount) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setStructuredStatus(status);
+        task.setStructuredProgress(progress);
+        task.setStructuredElementCount(elementCount);
+        task.setStructuredImageCount(imageCount);
+        task.setStructuredTableCount(tableCount);
+        task.setCurrentStep("structured");
+        updateOverallProgress(task);
+        
+        parseTaskRepository.updateById(task);
+        log.debug("更新结构化解析进度: documentId={}, status={}, elements={}", documentId, status, elementCount);
+    }
+
+    /**
+     * 更新 NER 阶段进度
+     */
+    @Transactional
+    public void updateNerProgress(String documentId, String status, Integer progress, 
+                                   String nerTaskId, Integer entityCount, Integer relationCount) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setNerStatus(status);
+        task.setNerProgress(progress);
+        if (nerTaskId != null) {
+            task.setNerTaskId(nerTaskId);
+        }
+        if (entityCount != null) {
+            task.setNerEntityCount(entityCount);
+        }
+        if (relationCount != null) {
+            task.setNerRelationCount(relationCount);
+        }
+        task.setCurrentStep("ner");
+        updateOverallProgress(task);
+        
+        parseTaskRepository.updateById(task);
+        log.debug("更新NER进度: documentId={}, status={}, progress={}, entities={}", 
+                documentId, status, progress, entityCount);
+    }
+
+    /**
+     * 更新图构建阶段进度
+     */
+    @Transactional
+    public void updateGraphProgress(String documentId, String status, Integer progress) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setGraphStatus(status);
+        task.setGraphProgress(progress);
+        task.setCurrentStep("graph");
+        updateOverallProgress(task);
+        
+        // 如果图构建完成,标记整个任务完成
+        if ("completed".equals(status)) {
+            task.setStatus("completed");
+            task.setProgress(100);
+            task.setCompletedAt(new Date());
+        }
+        
+        parseTaskRepository.updateById(task);
+        log.debug("更新图构建进度: documentId={}, status={}, progress={}", documentId, status, progress);
+    }
+
+    /**
+     * 标记任务失败
+     */
+    @Transactional
+    public void markFailed(String documentId, String stage, String errorMessage) {
+        ParseTask task = parseTaskRepository.findByDocumentId(documentId);
+        if (task == null) {
+            log.warn("任务不存在: documentId={}", documentId);
+            return;
+        }
+        
+        task.setStatus("failed");
+        task.setErrorMessage(errorMessage);
+        task.setCurrentStep(stage);
+        task.setCompletedAt(new Date());
+        
+        // 标记对应阶段失败
+        switch (stage) {
+            case "parse":
+                task.setParseStatus("failed");
+                break;
+            case "rag":
+                task.setRagStatus("failed");
+                break;
+            case "structured":
+                task.setStructuredStatus("failed");
+                break;
+            case "ner":
+                task.setNerStatus("failed");
+                break;
+            case "graph":
+                task.setGraphStatus("failed");
+                break;
+        }
+        
+        parseTaskRepository.updateById(task);
+        log.error("任务失败: documentId={}, stage={}, error={}", documentId, stage, errorMessage);
+    }
+
+    /**
+     * 计算并更新总体进度
+     */
+    private void updateOverallProgress(ParseTask task) {
+        int totalProgress = 0;
+        
+        // 计算各阶段贡献的进度
+        totalProgress += calculateStageProgress(task.getParseStatus(), task.getParseProgress(), WEIGHT_PARSE);
+        totalProgress += calculateStageProgress(task.getRagStatus(), task.getRagProgress(), WEIGHT_RAG);
+        totalProgress += calculateStageProgress(task.getStructuredStatus(), task.getStructuredProgress(), WEIGHT_STRUCTURED);
+        totalProgress += calculateStageProgress(task.getNerStatus(), task.getNerProgress(), WEIGHT_NER);
+        totalProgress += calculateStageProgress(task.getGraphStatus(), task.getGraphProgress(), WEIGHT_GRAPH);
+        
+        // 归一化到 0-100
+        int overallProgress = (totalProgress * 100) / TOTAL_WEIGHT;
+        task.setProgress(Math.min(overallProgress, 100));
+        
+        // 更新整体状态
+        if (isAnyFailed(task)) {
+            task.setStatus("failed");
+        } else if (isAllCompleted(task)) {
+            task.setStatus("completed");
+        } else if (isAnyProcessing(task)) {
+            task.setStatus("processing");
+        }
+    }
+    
+    private int calculateStageProgress(String status, Integer progress, int weight) {
+        if ("completed".equals(status)) {
+            return weight;
+        } else if ("processing".equals(status) && progress != null) {
+            return (progress * weight) / 100;
+        }
+        return 0;
+    }
+    
+    private boolean isAnyFailed(ParseTask task) {
+        return "failed".equals(task.getParseStatus()) ||
+               "failed".equals(task.getRagStatus()) ||
+               "failed".equals(task.getStructuredStatus()) ||
+               "failed".equals(task.getNerStatus()) ||
+               "failed".equals(task.getGraphStatus());
+    }
+    
+    private boolean isAllCompleted(ParseTask task) {
+        return "completed".equals(task.getParseStatus()) &&
+               "completed".equals(task.getRagStatus()) &&
+               "completed".equals(task.getStructuredStatus()) &&
+               "completed".equals(task.getNerStatus()) &&
+               "completed".equals(task.getGraphStatus());
+    }
+    
+    private boolean isAnyProcessing(ParseTask task) {
+        return "processing".equals(task.getParseStatus()) ||
+               "processing".equals(task.getRagStatus()) ||
+               "processing".equals(task.getStructuredStatus()) ||
+               "processing".equals(task.getNerStatus()) ||
+               "processing".equals(task.getGraphStatus());
+    }
+}

+ 7 - 1
backend/parse-service/src/main/java/com/lingyue/parse/vo/ParseTaskStageVO.java

@@ -12,7 +12,7 @@ import java.util.Date;
 @Schema(description = "解析任务阶段信息")
 public class ParseTaskStageVO {
 
-    @Schema(description = "阶段名称,例如: upload, ocr, text_save, ner, graph")
+    @Schema(description = "阶段名称: upload/parse/rag/structured/ner/graph")
     private String stageName;
 
     @Schema(description = "阶段显示名称")
@@ -32,6 +32,12 @@ public class ParseTaskStageVO {
 
     @Schema(description = "阶段错误信息")
     private String errorMessage;
+    
+    @Schema(description = "子任务ID(如NER任务ID)")
+    private String taskId;
+    
+    @Schema(description = "结果摘要(如:240个实体, 113个关系)")
+    private String resultSummary;
 }
 
 

+ 36 - 0
database/migrations/V2026_01_22_01__enhance_parse_tasks_stages.sql

@@ -0,0 +1,36 @@
+-- ============================================
+-- 扩展 parse_tasks 表,支持多阶段进度跟踪
+-- ============================================
+
+-- 添加各阶段状态字段
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS parse_status VARCHAR(20) DEFAULT 'pending';
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS parse_progress INTEGER DEFAULT 0;
+
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS rag_status VARCHAR(20) DEFAULT 'pending';
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS rag_progress INTEGER DEFAULT 0;
+
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS structured_status VARCHAR(20) DEFAULT 'pending';
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS structured_progress INTEGER DEFAULT 0;
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS structured_element_count INTEGER;
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS structured_image_count INTEGER;
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS structured_table_count INTEGER;
+
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS ner_status VARCHAR(20) DEFAULT 'pending';
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS ner_progress INTEGER DEFAULT 0;
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS ner_task_id VARCHAR(64);
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS ner_entity_count INTEGER;
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS ner_relation_count INTEGER;
+
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS graph_status VARCHAR(20) DEFAULT 'pending';
+ALTER TABLE parse_tasks ADD COLUMN IF NOT EXISTS graph_progress INTEGER DEFAULT 0;
+
+-- 添加索引
+CREATE INDEX IF NOT EXISTS idx_parse_tasks_ner_task_id ON parse_tasks(ner_task_id);
+
+-- 添加注释
+COMMENT ON COLUMN parse_tasks.parse_status IS '解析阶段状态: pending/processing/completed/failed';
+COMMENT ON COLUMN parse_tasks.rag_status IS 'RAG向量化阶段状态';
+COMMENT ON COLUMN parse_tasks.structured_status IS '结构化解析阶段状态';
+COMMENT ON COLUMN parse_tasks.ner_status IS 'NER实体提取阶段状态';
+COMMENT ON COLUMN parse_tasks.graph_status IS '图构建阶段状态';
+COMMENT ON COLUMN parse_tasks.ner_task_id IS 'NER Python服务任务ID';