Ver código fonte

feat: safetyFsApproval JSON 解析 - projectInfo 与 parse_safety_feasibility_approval_investment(同步 Clerk2.5 e0c839d)

何文松 3 semanas atrás
pai
commit
f8164dc2b0

+ 44 - 1
pdf_converter_v2/models/data_models.py

@@ -4,7 +4,7 @@
 数据模型定义
 """
 
-from typing import List
+from typing import List, Optional
 
 
 class WeatherData:
@@ -283,9 +283,18 @@ class FeasibilityApprovalInvestment:
     - Level 0: 顶层大类(如"山西晋城周村220千伏输变电工程")
     - Level 1: 二级分类(如"变电工程"、"线路工程"),有自己的 items
     - Level 2: 具体项目(如"周村220千伏变电站新建工程")
+    
+    项目信息(可选,用于 safetyFsApproval 类型):
+    - projectName: 工程(项目)名称
+    - projectUnit: 项目单位
+    - designUnit: 设计单位
     """
     def __init__(self):
         self.items: List[InvestmentItem] = []
+        # 项目基本信息(safetyFsApproval 专用)
+        self.projectName: Optional[str] = None
+        self.projectUnit: Optional[str] = None
+        self.designUnit: Optional[str] = None
     
     def to_dict(self):
         """转换为嵌套结构,与 designReview 保持一致
@@ -294,14 +303,38 @@ class FeasibilityApprovalInvestment:
         Level="2" 的项目作为二级分类(Level: 1),有自己的 items
         Level="3" 的项目作为具体项目(Level: 2),放入二级分类的 items
         Level="0" 的项目(合计)跳过
+        
+        特殊处理:如果表格没有 Level=1 的顶层大类(如湖北省格式),
+        自动创建一个虚拟顶层大类来包含所有 Level=2 的项目
         """
         if not self.items:
             return []
         
+        # 检查是否有 Level=1 的顶层大类
+        has_level_1 = any(item.level == "1" for item in self.items)
+        
         result = []
         current_top_category = None  # Level 0 顶层大类
         current_sub_category = None  # Level 1 二级分类
         
+        # 如果没有 Level=1 的顶层大类,创建一个虚拟的
+        if not has_level_1:
+            current_top_category = {
+                "name": "项目总表",
+                "Level": 0,
+                "constructionScaleSubstation": "",
+                "constructionScaleBay": "",
+                "constructionScaleOverheadLine": "",
+                "constructionScaleOpticalCable": "",
+                "staticInvestment": "",
+                "dynamicInvestment": "",
+                "constructionProjectCost": "",
+                "equipmentPurchaseCost": "",
+                "installationProjectCost": "",
+                "otherExpenses": "",
+                "items": []
+            }
+        
         for item in self.items:
             if item.level == "1":
                 # 顶层大类(如"山西晋城周村220千伏输变电工程")
@@ -381,6 +414,16 @@ class FeasibilityApprovalInvestment:
         if current_top_category is not None:
             result.append(current_top_category)
         
+        # 如果有项目信息,返回包含项目信息的字典;否则直接返回数据列表
+        if self.projectName or self.projectUnit or self.designUnit:
+            return {
+                "projectInfo": {
+                    "projectName": self.projectName or "",
+                    "projectUnit": self.projectUnit or "",
+                    "designUnit": self.designUnit or ""
+                },
+                "data": result
+            }
         return result
     
     @staticmethod

+ 8 - 1
pdf_converter_v2/parser/document_type.py

@@ -19,6 +19,7 @@ def detect_document_type(markdown_content: str) -> str:
             - "fsApproval" - 可研批复投资估算
             - "fsReview" - 可研评审投资估算
             - "pdApproval" - 初设批复概算投资
+            - "safetyFsApproval" - 安评可研批复投资估算
             - "unknown" - 未知类型
     """
     # 检测表格类型(噪声、电磁)- 兼容旧名称
@@ -27,7 +28,13 @@ def detect_document_type(markdown_content: str) -> str:
     if "工频电场/磁场环境检测原始记录表" in markdown_content or "工频电场磁场环境检测原始记录表" in markdown_content:
         return "emRec"  # 也支持 electromagnetic_detection
     
-    # 检测投资估算类型(新增3个类型)
+    # 检测投资估算类型
+    # 安评可研批复(先于可研批复判断,结构同可研批复)
+    if ("安评可研批复" in markdown_content or ("安全评价" in markdown_content and "可行性研究报告的批复" in markdown_content)) and \
+       ("工程或费用名称" in markdown_content or "静态投资" in markdown_content) and \
+       ("架空线" in markdown_content or "间隔" in markdown_content):
+        return "safetyFsApproval"
+    
     # 可研批复投资估算(包含建设规模相关字段)
     if ("可研批复" in markdown_content or "可行性研究报告的批复" in markdown_content) and \
        ("工程或费用名称" in markdown_content or "静态投资" in markdown_content):

+ 191 - 1
pdf_converter_v2/parser/investment_parser.py

@@ -2,10 +2,11 @@
 
 """
 投资估算表格解析模块
-支持种类型:
+支持种类型:
 1. 可研批复投资估算
 2. 可研评审投资估算
 3. 初设批复概算投资
+4. 安评可研批复投资估算
 """
 
 from typing import List, Optional
@@ -33,8 +34,14 @@ def detect_investment_type(markdown_content: str) -> Optional[str]:
             - "fsApproval" - 可研批复
             - "fsReview" - 可研评审
             - "pdApproval" - 初设批复
+            - "safetyFsApproval" - 安评可研批复
             - None - 无法识别
     """
+    # 安评可研批复(先于可研批复判断,结构同可研批复)
+    if "安评可研批复" in markdown_content or ("安全评价" in markdown_content and "可行性研究报告的批复" in markdown_content):
+        if "架空线" in markdown_content or "间隔" in markdown_content:
+            logger.info("[投资估算] 检测到类型: 安评可研批复投资估算")
+            return "safetyFsApproval"
     # 检查标题关键词
     if "可研批复" in markdown_content or "可行性研究报告的批复" in markdown_content:
         # 检查是否有建设规模相关列(可研批复特有)
@@ -417,6 +424,185 @@ def parse_feasibility_approval_investment(markdown_content: str) -> FeasibilityA
     return record
 
 
+def parse_safety_feasibility_approval_investment(markdown_content: str) -> FeasibilityApprovalInvestment:
+    """
+    解析安全可研批复投资估算(湖北省格式)
+    
+    特点:
+    - 没有顶层大类(Level=1),直接从二级分类开始
+    - 中文序号(一、二)表示二级分类(如"变电工程"、"线路工程")
+    - 阿拉伯数字(1、2、3)表示具体项目
+    - 列名使用"项目名称"和"静态合计/动态合计"
+    - 先扫描项目信息表提取工程名称、项目单位、设计单位
+    
+    返回结构:
+    - Level 1: 二级分类(如"变电工程"、"线路工程")
+    - Level 2: 具体项目(如"襄阳连云220千伏变电站新建工程")
+    """
+    record = FeasibilityApprovalInvestment()
+    tables = extract_table_with_rowspan_colspan(markdown_content)
+    if not tables:
+        logger.warning("[安全可研批复投资] 未能提取出任何表格内容")
+        return record
+
+    # 首先尝试提取项目基本信息表格
+    for table_idx, table in enumerate(tables):
+        if len(table) < 2:
+            continue
+        table_text = ""
+        for row in table:
+            table_text += " ".join([str(cell) for cell in row])
+        table_text_no_space = table_text.replace(" ", "").replace("(", "(").replace(")", ")")
+        if "工程(项目)名称" in table_text_no_space or "工程项目名称" in table_text_no_space:
+            logger.info(f"[安全可研批复投资] 找到项目信息表格 (表格{table_idx+1})")
+            for row in table:
+                if len(row) >= 2:
+                    key = str(row[0]).strip()
+                    value = str(row[1]).strip() if len(row) > 1 else ""
+                    if "工程" in key and "名称" in key:
+                        record.projectName = value
+                        logger.info(f"[安全可研批复投资] 提取工程名称: {value}")
+                    elif "项目单位" in key:
+                        record.projectUnit = value
+                        logger.info(f"[安全可研批复投资] 提取项目单位: {value}")
+                    elif "设计单位" in key:
+                        record.designUnit = value
+                        logger.info(f"[安全可研批复投资] 提取设计单位: {value}")
+            break
+
+    # 找到所有投资估算表格并合并
+    all_matching_tables = []
+    for table_idx, table in enumerate(tables):
+        table_text = ""
+        for row in table:
+            table_text += " ".join([str(cell) for cell in row])
+        table_text_no_space = table_text.replace(" ", "")
+        has_name_col = "项目名称" in table_text_no_space
+        has_investment_col = ("静态合计" in table_text_no_space or "静态投资" in table_text_no_space)
+        if has_name_col and has_investment_col:
+            all_matching_tables.append((table_idx, table))
+            logger.info(f"[安全可研批复投资] 找到投资估算表格 (表格{table_idx+1}), 行数: {len(table)}")
+
+    if not all_matching_tables:
+        logger.warning("[安全可研批复投资] 未找到包含投资估算的表格")
+        return record
+
+    if len(all_matching_tables) == 1:
+        target_table = all_matching_tables[0][1]
+    else:
+        logger.info(f"[安全可研批复投资] 发现 {len(all_matching_tables)} 个投资估算表格,将进行合并")
+        target_table = []
+        first_table = True
+        for table_idx, table in all_matching_tables:
+            if first_table:
+                target_table.extend(table)
+                first_table = False
+            else:
+                header_end_idx = 0
+                for row_idx, row in enumerate(table):
+                    row_text = " ".join([str(cell) for cell in row]).replace(" ", "")
+                    if "序号" in row_text or "项目名称" in row_text or "建设规模" in row_text:
+                        header_end_idx = row_idx + 1
+                    elif len(row) > 0:
+                        first_cell = str(row[0]).strip()
+                        if first_cell in ["一", "二", "三", "四", "五", "六", "七", "八", "九", "十"]:
+                            break
+                target_table.extend(table[header_end_idx:])
+        logger.info(f"[安全可研批复投资] 合并后总行数: {len(target_table)}")
+
+    header_row_idx = -1
+    no_idx = name_idx = overhead_line_idx = bay_idx = substation_idx = optical_cable_idx = -1
+    static_investment_idx = dynamic_investment_idx = -1
+    construction_project_cost_idx = equipment_purchase_cost_idx = installation_project_cost_idx = other_expenses_idx = -1
+
+    for row_idx in range(min(5, len(target_table))):
+        row = target_table[row_idx]
+        row_text = " ".join([str(cell) for cell in row])
+        row_text_no_space = row_text.replace(" ", "")
+        for col_idx, cell in enumerate(row):
+            cell_text = str(cell).strip()
+            cell_text_no_space = cell_text.replace(" ", "")
+            if "序号" in cell_text and no_idx == -1:
+                no_idx = col_idx
+            elif "项目名称" in cell_text_no_space and name_idx == -1:
+                name_idx = col_idx
+            elif "架空线" in cell_text_no_space and overhead_line_idx == -1:
+                overhead_line_idx = col_idx
+            elif "间隔" in cell_text and bay_idx == -1:
+                bay_idx = col_idx
+            elif "变电" in cell_text and substation_idx == -1:
+                substation_idx = col_idx
+            elif "光缆" in cell_text and optical_cable_idx == -1:
+                optical_cable_idx = col_idx
+            elif ("静态投资" in cell_text_no_space or "静态合计" in cell_text_no_space) and static_investment_idx == -1:
+                static_investment_idx = col_idx
+            elif ("动态投资" in cell_text_no_space or "动态合计" in cell_text_no_space) and dynamic_investment_idx == -1:
+                dynamic_investment_idx = col_idx
+            elif "建筑工程费" in cell_text_no_space and construction_project_cost_idx == -1:
+                construction_project_cost_idx = col_idx
+            elif "设备购置费" in cell_text_no_space and equipment_purchase_cost_idx == -1:
+                equipment_purchase_cost_idx = col_idx
+            elif "安装工程费" in cell_text_no_space and installation_project_cost_idx == -1:
+                installation_project_cost_idx = col_idx
+            elif ("其他费用" in cell_text_no_space or "合计" == cell_text_no_space) and other_expenses_idx == -1:
+                if "其他费用" in cell_text_no_space:
+                    other_expenses_idx = col_idx
+        if ("序号" in row_text or "项目名称" in row_text_no_space) and header_row_idx == -1:
+            header_row_idx = row_idx
+
+    for row_idx in range(min(5, len(target_table))):
+        row = target_table[row_idx]
+        if len(row) > 0:
+            first_cell = str(row[0]).strip()
+            if first_cell and first_cell not in ["序号", ""] and (first_cell in ["一", "二", "三", "四", "五"] or first_cell.isdigit()):
+                header_row_idx = row_idx - 1
+                break
+
+    if header_row_idx == -1:
+        logger.warning("[安全可研批复投资] 未找到表头行")
+        return record
+
+    for row_idx in range(header_row_idx + 1, len(target_table)):
+        row = target_table[row_idx]
+        if len(row) < 3:
+            continue
+        name = str(row[name_idx]).strip() if name_idx >= 0 and name_idx < len(row) else ""
+        if not name or name in ["", "nan", "None"]:
+            continue
+        no = str(row[no_idx]).strip() if no_idx >= 0 and no_idx < len(row) else ""
+        level_input = (no + name) if no else name
+        level = determine_level(level_input, name, strict_mode=False)
+        item = InvestmentItem()
+        item.no = no
+        item.name = name
+        item.level = level
+        if overhead_line_idx >= 0 and overhead_line_idx < len(row):
+            item.constructionScaleOverheadLine = str(row[overhead_line_idx]).strip()
+        if bay_idx >= 0 and bay_idx < len(row):
+            item.constructionScaleBay = str(row[bay_idx]).strip()
+        if substation_idx >= 0 and substation_idx < len(row):
+            item.constructionScaleSubstation = str(row[substation_idx]).strip()
+        if optical_cable_idx >= 0 and optical_cable_idx < len(row):
+            item.constructionScaleOpticalCable = str(row[optical_cable_idx]).strip()
+        if static_investment_idx >= 0 and static_investment_idx < len(row):
+            item.staticInvestment = clean_number_string(str(row[static_investment_idx]))
+        if dynamic_investment_idx >= 0 and dynamic_investment_idx < len(row):
+            item.dynamicInvestment = clean_number_string(str(row[dynamic_investment_idx]))
+        if construction_project_cost_idx >= 0 and construction_project_cost_idx < len(row):
+            item.constructionProjectCost = clean_number_string(str(row[construction_project_cost_idx]))
+        if equipment_purchase_cost_idx >= 0 and equipment_purchase_cost_idx < len(row):
+            item.equipmentPurchaseCost = clean_number_string(str(row[equipment_purchase_cost_idx]))
+        if installation_project_cost_idx >= 0 and installation_project_cost_idx < len(row):
+            item.installationProjectCost = clean_number_string(str(row[installation_project_cost_idx]))
+        if other_expenses_idx >= 0 and other_expenses_idx < len(row):
+            item.otherExpenses = clean_number_string(str(row[other_expenses_idx]))
+        record.items.append(item)
+        logger.info(f"[安全可研批复投资] 解析到数据: No={item.no}, Name={item.name}, Level={item.level}")
+
+    logger.info(f"[安全可研批复投资] 共解析到 {len(record.items)} 条数据")
+    return record
+
+
 def parse_feasibility_review_investment(markdown_content: str) -> FeasibilityReviewInvestment:
     """
     解析可研评审投资估算
@@ -794,6 +980,7 @@ def parse_investment_record(markdown_content: str, investment_type: Optional[str
             - "fsApproval" - 可研批复
             - "fsReview" - 可研评审
             - "pdApproval" - 初设批复
+            - "safetyFsApproval" - 安评可研批复
     
     Returns:
         解析后的记录对象
@@ -821,6 +1008,9 @@ def parse_investment_record(markdown_content: str, investment_type: Optional[str
     result = None
     if investment_type == "fsApproval":
         result = parse_feasibility_approval_investment(markdown_content)
+    elif investment_type == "safetyFsApproval":
+        # 安评可研批复使用独立解析(湖北省格式,含项目信息表)
+        result = parse_safety_feasibility_approval_investment(markdown_content)
     elif investment_type == "fsReview":
         result = parse_feasibility_review_investment(markdown_content)
     elif investment_type == "pdApproval":

+ 2 - 2
pdf_converter_v2/parser/json_converter.py

@@ -329,7 +329,7 @@ def parse_markdown_to_json(markdown_content: str, first_page_image: Optional[Ima
                 op_list = parse_operational_conditions(markdown_content, require_title=False)
             serialized = [oc.to_dict() if hasattr(oc, "to_dict") else oc for oc in (op_list or [])]
             result = {"document_type": forced_document_type, "data": {"operationalConditions": serialized}}
-        elif forced_document_type in ["fsApproval", "fsReview", "pdApproval"]:
+        elif forced_document_type in ["fsApproval", "fsReview", "pdApproval", "safetyFsApproval"]:
             # 投资估算类型处理
             logger.info(f"[JSON转换] 处理投资估算类型: {forced_document_type}")
             logger.debug(f"[JSON转换] Markdown内容长度: {len(markdown_content)} 字符")
@@ -432,7 +432,7 @@ def parse_markdown_to_json(markdown_content: str, first_page_image: Optional[Ima
     elif doc_type == "emRec":
         data = parse_electromagnetic_detection_record(markdown_content).to_dict()
         result = {"document_type": doc_type, "data": data}
-    elif doc_type in ["fsApproval", "fsReview", "pdApproval"]:
+    elif doc_type in ["fsApproval", "fsReview", "pdApproval", "safetyFsApproval"]:
         # 新增:投资估算类型
         logger.info(f"[JSON转换] 检测到投资估算类型: {doc_type}")
         logger.debug(f"[JSON转换] Markdown内容长度: {len(markdown_content)} 字符")