# Copyright (c) Opendatalab. All rights reserved. """ 表格解析模块 v2 - 独立版本,不依赖v1 """ from typing import List import re from ..utils.logging_config import get_logger from ..models.data_models import OperationalCondition, OperationalConditionV2 logger = get_logger("pdf_converter_v2.parser.table") def normalize_text(text: str) -> str: """将常见全角符号、大小写等统一,便于关键词匹配""" if not text: return "" text = text.lower() replacements = { "(": "(", ")": ")", ":": ":", "-": "-", "—": "-", "〜": "~", "~": "~", "/": "/", " ": " ", } for old, new in replacements.items(): text = text.replace(old, new) return text def parse_table_cell(cell_content: str) -> str: """解析表格单元格内容""" if not cell_content: return "" cell_content = re.sub(r'<[^>]+>', '', cell_content) cell_content = re.sub(r'\s+', ' ', cell_content).strip() return cell_content def extract_table_data(markdown_content: str) -> List[List[List[str]]]: """从Markdown内容中提取表格数据""" tables: List[List[List[str]]] = [] # 匹配带属性的table标签,如 table_matches = re.findall(r']*>(.*?)
', markdown_content, re.DOTALL) logger.debug(f"[extract_table_data] 共找到 {len(table_matches)} 个表格") for table_idx, table_content in enumerate(table_matches): table_rows: List[List[str]] = [] tr_matches = re.findall(r']*>(.*?)', table_content, re.DOTALL) logger.debug(f"[extract_table_data] 表格{table_idx}, 行数: {len(tr_matches)}") for row_idx, tr_content in enumerate(tr_matches): td_matches = re.findall(r']*>(.*?)', tr_content) row: List[str] = [parse_table_cell(td) for td in td_matches] if row: table_rows.append(row) if table_rows: tables.append(table_rows) logger.debug(f"[extract_table_data] 总表格: {len(tables)}") return tables def extract_table_with_rowspan_colspan(markdown_content: str) -> List[List[List[str]]]: """提取表格数据,处理rowspan和colspan属性""" tables: List[List[List[str]]] = [] # 匹配带属性的table标签,如 table_matches = re.findall(r']*>(.*?)
', markdown_content, re.DOTALL) logger.debug(f"[extract_table_with_rowspan_colspan] 共找到 {len(table_matches)} 个表格") for table_idx, table_content in enumerate(table_matches): tr_matches = re.findall(r']*>(.*?)', table_content, re.DOTALL) logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx}, 行数: {len(tr_matches)}") if not tr_matches: continue # 用于存储rowspan的值(跨行的单元格值) rowspan_values = {} # {(row_idx, col_idx): (value, remaining_rows)} # 先构建一个矩阵来存储所有单元格 max_cols = 0 table_matrix = [] for row_idx, tr_content in enumerate(tr_matches): # 找到所有td标签,包括属性 td_pattern = r']*>(.*?)' td_matches_with_attrs = re.finditer(td_pattern, tr_content, re.DOTALL) row = [] col_idx = 0 for td_match in td_matches_with_attrs: full_td = td_match.group(0) cell_content = td_match.group(1) # 提取rowspan和colspan属性 rowspan_match = re.search(r'rowspan=["\']?(\d+)["\']?', full_td) colspan_match = re.search(r'colspan=["\']?(\d+)["\']?', full_td) rowspan = int(rowspan_match.group(1)) if rowspan_match else 1 colspan = int(colspan_match.group(1)) if colspan_match else 1 # 解析单元格内容 cell_text = parse_table_cell(cell_content) # 跳过被rowspan占用的列 while (row_idx, col_idx) in rowspan_values: row.append(rowspan_values[(row_idx, col_idx)][0]) # 使用rowspan的值 remaining = rowspan_values[(row_idx, col_idx)][1] - 1 if remaining > 0: rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining) del rowspan_values[(row_idx, col_idx)] col_idx += 1 # 添加单元格内容 for c in range(colspan): row.append(cell_text if c == 0 else "") # 如果有rowspan,记录到后续行 if rowspan > 1 and c == 0: rowspan_values[(row_idx + 1, col_idx)] = (cell_text, rowspan - 1) col_idx += 1 # 处理剩余的被rowspan占用的列 while (row_idx, col_idx) in rowspan_values: row.append(rowspan_values[(row_idx, col_idx)][0]) remaining = rowspan_values[(row_idx, col_idx)][1] - 1 if remaining > 0: rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining) del rowspan_values[(row_idx, col_idx)] col_idx += 1 if row: table_matrix.append(row) max_cols = max(max_cols, len(row)) logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx} 第{row_idx}行, 内容: {row}") # 统一列数(可选,确保每行列数一致) for row in table_matrix: while len(row) < max_cols: row.append("") if table_matrix: tables.append(table_matrix) logger.debug(f"[extract_table_with_rowspan_colspan] 总表格: {len(tables)}") return tables def parse_operational_conditions(markdown_content: str, require_title: bool = True) -> List[OperationalCondition]: """解析工况信息表格 Args: markdown_content: Markdown内容 require_title: 是否要求必须有标题标识(如"附件2 工况信息"),默认为True 如果为False,则仅根据表格结构判断是否为工况信息表格 """ conditions: List[OperationalCondition] = [] # 查找工况信息相关的表格 if require_title: if "附件2 工况信息" not in markdown_content and "工况信息" not in markdown_content: logger.debug("[工况信息] 未找到工况信息标识") return conditions else: logger.debug("[工况信息] 无标题模式:仅根据表格结构判断") # 提取表格数据(支持rowspan和colspan) tables = extract_table_with_rowspan_colspan(markdown_content) if not tables: logger.warning("[工况信息] 未能提取出任何表格内容") return conditions # 查找工况信息表格(通常包含"检测时间"、"电压"、"电流"等关键词) for table in tables: if not table or len(table) < 2: continue # 检查表头是否包含工况信息的关键词 header_row = table[0] has_operational_keywords = any( keyword in " ".join(header_row) for keyword in ["检测时间", "电压", "电流", "有功功率", "无功功率", "项目"] ) if not has_operational_keywords: continue logger.info(f"[工况信息] 找到工况信息表格,行数: {len(table)}") # 找到表头行的列索引 header_row = table[0] monitor_at_idx = -1 project_idx = -1 name_idx = -1 voltage_idx = -1 current_idx = -1 active_power_idx = -1 reactive_power_idx = -1 for idx, cell in enumerate(header_row): cell_lower = cell.lower() if "检测时间" in cell or "监测时间" in cell: monitor_at_idx = idx elif "项目" in cell: # 项目列可能有colspan,需要找到实际的列 if project_idx == -1: project_idx = idx # 检查下一列是否是名称列(如果项目列colspan=2,下一列可能是名称) if idx + 1 < len(header_row) and name_idx == -1: next_cell = header_row[idx + 1] if not any(k in next_cell.lower() for k in ["电压", "电流", "有功", "无功", "检测"]): name_idx = idx + 1 elif "电压" in cell or "电压(kv)" in cell_lower: voltage_idx = idx elif "电流" in cell or "电流(a)" in cell_lower: current_idx = idx elif "有功功率" in cell or ("有功" in cell and "功率" in cell): active_power_idx = idx elif "无功功率" in cell or ("无功" in cell and "功率" in cell): reactive_power_idx = idx elif ("名称" in cell or "主变" in cell) and name_idx == -1: name_idx = idx logger.debug(f"[工况信息] 列索引: 检测时间={monitor_at_idx}, 项目={project_idx}, 名称={name_idx}, " f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}") # 处理数据行(从第二行开始,第一行是表头) current_monitor_at = "" current_project = "" for row_idx in range(1, len(table)): row = table[row_idx] if len(row) < 4: # 至少需要检测时间、项目、名称等基本字段 continue # 检测时间 if monitor_at_idx >= 0 and monitor_at_idx < len(row) and row[monitor_at_idx].strip(): current_monitor_at = row[monitor_at_idx].strip() # 项目名称 if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip(): current_project = row[project_idx].strip() # 名称(如1#主变) name_value = "" if name_idx >= 0 and name_idx < len(row): name_value = row[name_idx].strip() elif project_idx >= 0 and project_idx + 1 < len(row): # 如果名称列在项目列后面 name_value = row[project_idx + 1].strip() # 只有当名称存在时才创建工况信息记录(因为有rowspan的情况) if name_value and any(k in name_value for k in ["主变", "#"]): oc = OperationalCondition() oc.monitorAt = current_monitor_at oc.project = current_project oc.name = name_value # 电压 if voltage_idx >= 0 and voltage_idx < len(row): oc.voltage = row[voltage_idx].strip() # 电流 if current_idx >= 0 and current_idx < len(row): oc.current = row[current_idx].strip() # 有功功率 if active_power_idx >= 0 and active_power_idx < len(row): oc.activePower = row[active_power_idx].strip() # 无功功率 if reactive_power_idx >= 0 and reactive_power_idx < len(row): oc.reactivePower = row[reactive_power_idx].strip() conditions.append(oc) logger.debug(f"[工况信息] 解析到: {oc.to_dict()}") # 只处理第一个匹配的表格 if conditions: break logger.info(f"[工况信息] 共解析到 {len(conditions)} 条工况信息") return conditions def parse_operational_conditions_v2(markdown_content: str) -> List[OperationalConditionV2]: """解析工况信息表格(新格式:表1检测工况) 表格结构: - 第一行:名称、时间,电压(kV)(colspan=2),电流(A)(colspan=2),有功(MW)(colspan=2),无功(Mvar)(colspan=2) - 第二行:最大值、最小值(重复4次) - 数据行:名称、时间、电压最大值、电压最小值、电流最大值、电流最小值、有功最大值、有功最小值、无功最大值、无功最小值 """ conditions: List[OperationalConditionV2] = [] # 检查是否包含"表1检测工况"标识(使用正则表达式,允许中间有空格) # 支持:表1检测工况、表 1 检测工况、表 1检测工况、表1 检测工况 等变体 pattern = r'表\s*1\s*检测工况' if not re.search(pattern, markdown_content): logger.debug("[工况信息V2] 未找到'表1检测工况'标识(包括空格变体)") return conditions logger.debug("[工况信息V2] 检测到'表1检测工况'格式(包括空格变体),第一列将映射到name字段,project字段保持为空") # 提取表格数据(支持rowspan和colspan) tables = extract_table_with_rowspan_colspan(markdown_content) if not tables: logger.warning("[工况信息V2] 未能提取出任何表格内容") return conditions # 查找包含"表1检测工况"的表格 # 表格结构:第一行是名称、时间,然后是电压、电流、有功、无功(各占2列) for table in tables: if not table or len(table) < 3: # 至少需要表头2行和数据1行 continue # 检查第一行表头是否包含"名称"、"时间"、"电压"等关键词 first_row = table[0] first_row_text = " ".join(first_row).lower() has_keywords = any(k in first_row_text for k in ["名称", "时间", "电压", "电流", "有功", "无功"]) if not has_keywords: continue logger.info(f"[工况信息V2] 找到工况信息表格,行数: {len(table)}") # 列索引映射(根据表格结构) # 列0: 项目名称(映射到name字段) # 列1: 时间 # 列2: 电压最大值 # 列3: 电压最小值 # 列4: 电流最大值 # 列5: 电流最小值 # 列6: 有功最大值 # 列7: 有功最小值 # 列8: 无功最大值 # 列9: 无功最小值 # 注意:对于"表1检测工况"格式,第一列映射到name字段,project字段保持为空 name_idx = 0 # 第一列是项目名称(如"500kV 江黄Ⅰ线") time_idx = 1 voltage_max_idx = 2 voltage_min_idx = 3 current_max_idx = 4 current_min_idx = 5 active_power_max_idx = 6 active_power_min_idx = 7 reactive_power_max_idx = 8 reactive_power_min_idx = 9 # 从第三行开始解析数据(前两行是表头) logger.debug(f"[工况信息V2] 表格总行数: {len(table)}, 开始从第3行(索引2)解析数据行") for row_idx in range(2, len(table)): row = table[row_idx] logger.debug(f"[工况信息V2] 处理第{row_idx}行(索引{row_idx}): 列数={len(row)}, 内容={row[:5]}...") # 只打印前5列用于调试 # 至少需要10列(名称、时间、电压max/min、电流max/min、有功max/min、无功max/min) if len(row) < 10: logger.warning(f"[工况信息V2] 第{row_idx}行列数不足10列,跳过: {len(row)}列") continue # 检查是否是数据行(第一列应该有项目名称,且不是"名称"、"最大值"、"最小值"等表头关键词) first_cell = row[name_idx].strip() if name_idx < len(row) else "" if not first_cell or first_cell in ["名称", "最大值", "最小值", "时间"]: logger.debug(f"[工况信息V2] 第{row_idx}行第一列为表头关键词或为空,跳过: '{first_cell}'") continue # 跳过完全空的行(但允许某些字段为空) if not any(cell.strip() for cell in row[:2]): # 至少名称或时间应该有值 logger.debug(f"[工况信息V2] 第{row_idx}行前两列为空,跳过") continue oc = OperationalConditionV2() # 名称(列0,映射到name字段) if name_idx < len(row): oc.name = row[name_idx].strip() # project字段保持为空(仅在检测工况之外的场景使用) oc.project = "" # 时间(列1) if time_idx < len(row): oc.monitorAt = row[time_idx].strip() # 电压最大值(列2) if voltage_max_idx < len(row): oc.maxVoltage = row[voltage_max_idx].strip() # 电压最小值(列3) if voltage_min_idx < len(row): oc.minVoltage = row[voltage_min_idx].strip() # 电流最大值(列4) if current_max_idx < len(row): oc.maxCurrent = row[current_max_idx].strip() # 电流最小值(列5) if current_min_idx < len(row): oc.minCurrent = row[current_min_idx].strip() # 有功功率最大值(列6) if active_power_max_idx < len(row): oc.maxActivePower = row[active_power_max_idx].strip() # 有功功率最小值(列7) if active_power_min_idx < len(row): oc.minActivePower = row[active_power_min_idx].strip() # 无功功率最大值(列8) if reactive_power_max_idx < len(row): oc.maxReactivePower = row[reactive_power_max_idx].strip() # 无功功率最小值(列9) if reactive_power_min_idx < len(row): oc.minReactivePower = row[reactive_power_min_idx].strip() # 添加记录(只要名称不为空) if oc.name: conditions.append(oc) logger.info(f"[工况信息V2] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}'") else: logger.warning(f"[工况信息V2] 第{row_idx}行名称为空,跳过该行: {row[:3]}") # 只处理第一个匹配的表格 if conditions: break logger.info(f"[工况信息V2] 共解析到 {len(conditions)} 条工况信息") return conditions def parse_operational_conditions_opstatus(markdown_content: str) -> List[OperationalCondition]: """解析工况信息表格(opStatus格式:附件 工况及工程信息) 表格结构: - 第一行:名称(rowspan=2)、时间(rowspan=2)、运行工况(colspan=4) - 第二行:U (kV)、I (A)、P (MW)、Q (Mvar) - 数据行:名称、时间(可能有rowspan)、U范围、I范围、P范围、Q范围 输出格式: [ { "monitorAt": "", // 检测时间 "project": "", // 项目名称(从"项目编号"提取,如果存在) "name": "", // 名称,如#1主变 "voltage": "", // 电压范围 "current": "", // 电流范围 "activePower": "", // 有功功率范围 "reactivePower": "", // 无功功率范围 } ] """ conditions: List[OperationalCondition] = [] # 检查是否包含"附件 工况及工程信息"标识 if "附件" not in markdown_content or "工况" not in markdown_content: logger.debug("[工况信息opStatus] 未找到'附件 工况及工程信息'标识") return conditions # 提取项目编号(如果存在) # 需要排除表格内容,只匹配表格之前的内容 project_number = "" # 先找到表格开始位置 table_start = markdown_content.find('') if table_start > 0: # 只在表格之前的内容中查找项目编号 content_before_table = markdown_content[:table_start] project_match = re.search(r'项目编号[::]\s*([^\n<]+)', content_before_table) if project_match: project_number = project_match.group(1).strip() logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}") else: # 如果没有表格,在整个内容中查找 project_match = re.search(r'项目编号[::]\s*([^\n<]+)', markdown_content) if project_match: project_number = project_match.group(1).strip() logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}") # 提取表格数据(支持rowspan和colspan) tables = extract_table_with_rowspan_colspan(markdown_content) if not tables: logger.warning("[工况信息opStatus] 未能提取出任何表格内容") return conditions # 查找工况信息表格(包含"名称"、"时间"、"U"、"I"、"P"、"Q"等关键词) for table in tables: if not table or len(table) < 3: # 至少需要表头2行和数据1行 continue # 检查表头是否包含工况信息的关键词 first_row = table[0] second_row = table[1] if len(table) > 1 else [] header_text = normalize_text(" ".join(first_row + second_row)) has_keywords = any( keyword in header_text for keyword in ["名称", "时间", "运行工况", "u", "i", "p", "q", "kv", "mw", "mvar"] ) if not has_keywords: continue logger.info(f"[工况信息opStatus] 找到工况信息表格,行数: {len(table)}") # 检测是否有两行表头(第二行包含"U (kV)"、"I (A)"等) has_two_row_header = False if len(table) > 1: second_row_text = normalize_text(" ".join(table[1])) # 兼容带空格和不带空格的写法,例如 "U (kV)" / "U(kV)" if ( any(k in second_row_text for k in ["u(kv)", "i(a)", "p(mw)", "q(mvar)"]) or ("u" in second_row_text and "kv" in second_row_text) or ("i" in second_row_text and "a" in second_row_text) or ("p" in second_row_text and "mw" in second_row_text) or ("q" in second_row_text and "mvar" in second_row_text) ): has_two_row_header = True logger.debug("[工况信息opStatus] 检测到两行表头格式") # 根据表头动态确定列索引 # 如果有两行表头,从第二行检测;否则从第一行检测 header_row = table[1] if has_two_row_header else table[0] time_idx = -1 project_idx = -1 name_idx = -1 voltage_idx = -1 current_idx = -1 active_power_idx = -1 reactive_power_idx = -1 for idx, cell in enumerate(header_row): cell_lower = cell.lower() cell_normalized = normalize_text(cell) if "检测时间" in cell or "监测时间" in cell or "时间" in cell: time_idx = idx elif "项目" in cell: project_idx = idx elif "名称" in cell: name_idx = idx elif "电压" in cell or ("u" in cell_normalized and "kv" in cell_normalized) or ("u (kv)" in cell_normalized): voltage_idx = idx elif "电流" in cell or ("i" in cell_normalized and "a" in cell_normalized) or ("i (a)" in cell_normalized): current_idx = idx elif "有功功率" in cell or ("有功" in cell and "功率" in cell) or ("p" in cell_normalized and "mw" in cell_normalized) or ("p (mw)" in cell_normalized): active_power_idx = idx elif "无功功率" in cell or ("无功" in cell and "功率" in cell) or ("q" in cell_normalized and "mvar" in cell_normalized) or ("q (mvar)" in cell_normalized): reactive_power_idx = idx # 如果第一行表头有"名称",也检查第一行 if has_two_row_header and name_idx == -1: first_row = table[0] for idx, cell in enumerate(first_row): if "名称" in cell: name_idx = idx break # 如果表头中没有找到名称列,尝试在数据行中查找 if name_idx == -1: # 确定第一行数据的位置(如果有两行表头,从第2行开始;否则从第1行开始) first_data_row_idx = 2 if has_two_row_header else 1 if len(table) > first_data_row_idx: first_data_row = table[first_data_row_idx] for idx, cell in enumerate(first_data_row): # 跳过已知的列(时间列和项目列) if idx != time_idx and idx != project_idx and cell.strip(): # 支持主变格式(包含"主变"或"#")和输电线路格式(包含"kV"和"线") if (any(k in cell for k in ["主变", "#"]) or ("kV" in cell and "线" in cell) or ("kV" in cell)): name_idx = idx logger.debug(f"[工况信息opStatus] 从数据行推断名称列索引: {name_idx}, 内容='{cell}'") break # 如果仍然没有找到名称列,但找到了项目列,则名称列通常在项目列之后 # 如果项目列有colspan,名称列可能在空列之后 if name_idx == -1 and project_idx >= 0: # 检查项目列之后是否有空列,如果有,名称列在空列位置(因为colspan会创建空列) if project_idx + 1 < len(header_row): if not header_row[project_idx + 1].strip(): # 项目列有colspan,名称列在空列位置(project_idx + 1) name_idx = project_idx + 1 else: # 名称列紧跟在项目列之后 name_idx = project_idx + 1 else: # 如果项目列是最后一列,名称列可能在项目列之后 name_idx = project_idx + 1 if project_idx + 1 < len(header_row) else -1 logger.debug(f"[工况信息opStatus] 列索引: 检测时间={time_idx}, 项目={project_idx}, 名称={name_idx}, " f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}") # 确定数据行起始位置(如果有两行表头,从第2行开始;否则从第1行开始) data_start_row = 2 if has_two_row_header else 1 current_monitor_at = "" current_project = "" for row_idx in range(data_start_row, len(table)): row = table[row_idx] logger.debug(f"[工况信息opStatus] 处理第{row_idx}行: 列数={len(row)}, 内容={row}") # 至少需要3列 if len(row) < 3: logger.warning(f"[工况信息opStatus] 第{row_idx}行列数不足,跳过: {len(row)}列") continue # 更新检测时间(如果有值且列索引有效) if time_idx >= 0 and time_idx < len(row) and row[time_idx].strip(): current_monitor_at = row[time_idx].strip() # 更新项目/位置(如果有值且列索引有效) if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip(): current_project = row[project_idx].strip() # 获取名称(变压器名称,如"1#主变") name_value = "" if name_idx >= 0 and name_idx < len(row): name_value = row[name_idx].strip() elif project_idx >= 0 and project_idx + 1 < len(row): # 如果名称列索引未找到,尝试项目列之后的第一列 name_value = row[project_idx + 1].strip() # 检查是否是噪声数据行(包含测点编号如N1、N2等,且没有电压/电流/功率列) is_noise_row = False # 检查名称列或第一列是否是测点编号 check_cell = name_value if name_value else (row[0].strip() if len(row) > 0 else "") if check_cell: # 如果名称列或第一列包含测点编号格式(N1、N2等)且没有找到电压/电流/功率列,可能是噪声数据行 if re.match(r'^N\d+', check_cell) and voltage_idx == -1 and current_idx == -1: is_noise_row = True logger.debug(f"[工况信息opStatus] 第{row_idx}行疑似噪声数据行,跳过: 第一列或名称列='{check_cell}'") # 只有当名称存在且不是噪声数据行时才创建工况信息记录 # 放宽名称验证:支持主变(包含"主变"或"#")和输电线路(包含"kV"和"线") is_valid_name = False if name_value: # 主变格式:包含"主变"或"#" if any(k in name_value for k in ["主变", "#"]): is_valid_name = True # 输电线路格式:包含"kV"和"线" elif "kV" in name_value and "线" in name_value: is_valid_name = True # 其他可能的格式:包含"kV"(可能是其他设备) elif "kV" in name_value: is_valid_name = True if is_valid_name and not is_noise_row: # 进一步验证:必须有电压或电流或功率列,否则可能是噪声数据行 if voltage_idx == -1 and current_idx == -1 and active_power_idx == -1 and reactive_power_idx == -1: logger.debug(f"[工况信息opStatus] 第{row_idx}行没有找到电压/电流/功率列,跳过: name='{name_value}'") continue # 创建工况信息记录 oc = OperationalCondition() oc.monitorAt = current_monitor_at oc.project = current_project # project字段使用表格中"项目"列的值 oc.name = name_value # 电压范围 if voltage_idx >= 0 and voltage_idx < len(row): oc.voltage = row[voltage_idx].strip() # 电流范围 if current_idx >= 0 and current_idx < len(row): oc.current = row[current_idx].strip() # 有功功率范围 if active_power_idx >= 0 and active_power_idx < len(row): oc.activePower = row[active_power_idx].strip() # 无功功率范围 if reactive_power_idx >= 0 and reactive_power_idx < len(row): oc.reactivePower = row[reactive_power_idx].strip() # 添加记录 conditions.append(oc) logger.info(f"[工况信息opStatus] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}', 项目='{oc.project}'") else: if is_noise_row: logger.debug(f"[工况信息opStatus] 第{row_idx}行是噪声数据行,跳过: name='{name_value}'") else: logger.debug(f"[工况信息opStatus] 第{row_idx}行名称无效或为空,跳过: name='{name_value}'") # 只处理第一个匹配的表格 if conditions: break logger.info(f"[工况信息opStatus] 共解析到 {len(conditions)} 条工况信息") return conditions def parse_operational_conditions_format3_5(markdown_content: str) -> List[OperationalConditionV2]: """解析工况信息表格(格式3和格式5:附件 2 工况信息,电压列第一列存储时间段) 表格结构: - 第一行:名称(rowspan=2)、时间(rowspan=2或colspan=2)、电压(kV)(colspan=2)、电流(A)(colspan=2)、有功(MW)(colspan=2)、无功(Mvar)(colspan=2) - 第二行:最大值、最小值(重复4次) - 数据行特点: - 电压列的第一列存储时间段(如"昼间9:00~11:00"),第二列是电压最大值 - 电流列的第一列是电流最大值,第二列是电流最小值 - 有功和无功类似 输出格式(使用OperationalConditionV2): [ { "monitorAt": "", // 检测时间(从时间列和电压时间段列组合) "project": "", // 项目名称 "name": "", // 名称,如#3主变 "maxVoltage": "", // 电压最大值 "minVoltage": "", // 电压最小值 "maxCurrent": "", // 电流最大值 "minCurrent": "", // 电流最小值 "maxActivePower": "", // 有功功率最大值 "minActivePower": "", // 有功功率最小值 "maxReactivePower": "", // 无功功率最大值 "minReactivePower": "", // 无功功率最小值 } ] """ conditions: List[OperationalConditionV2] = [] # 检查是否包含"附件 2 工况信息"或"附件 2 工况及工程信息"标识 if "附件" not in markdown_content or ("工况信息" not in markdown_content and "工况及工程信息" not in markdown_content): logger.debug("[工况信息格式3/5] 未找到'附件 2 工况信息'或'附件 2 工况及工程信息'标识") return conditions # 提取表格数据(支持rowspan和colspan) tables = extract_table_with_rowspan_colspan(markdown_content) if not tables: logger.warning("[工况信息格式3/5] 未能提取出任何表格内容") return conditions # 查找工况信息表格 for table in tables: if not table or len(table) < 3: # 至少需要表头2行和数据1行 continue # 检查表头是否包含工况信息的关键词 first_row = table[0] second_row = table[1] if len(table) > 1 else [] header_text = normalize_text(" ".join(first_row + second_row)) has_keywords = any( keyword in header_text for keyword in ["名称", "时间", "电压", "电流", "有功", "无功", "kv", "mw", "mvar"] ) if not has_keywords: continue logger.info(f"[工况信息格式3/5] 找到工况信息表格,行数: {len(table)}") # 检查是否是格式3/5(电压列第一列存储时间段) # 从数据行检查:如果电压列第一列包含"昼间"、"夜间"等时间段关键词,则是格式3/5 is_format3_5 = False if len(table) > 2: logger.debug(f"[工况信息格式3/5] 开始检查格式3/5特征,表格行数: {len(table)}") for row_idx in range(2, min(5, len(table))): # 检查前几行数据 row = table[row_idx] logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}, 内容: {row[:6]}") if len(row) >= 4: # 至少需要名称、时间、电压列 # 电压列可能在索引2或3(格式3在列2,格式5在列3,因为时间列有colspan=2) # 扩大检查范围到列2-6,以覆盖格式3和格式5 for col_idx in range(2, min(7, len(row))): cell = row[col_idx].strip() # 处理可能的换行符(如"昼间\n10:00~17:00") cell_normalized = cell.replace("\n", " ").replace("\r", " ").strip() if cell_normalized and any(k in cell_normalized for k in ["昼间", "夜间", "次日", "~", ":"]) and not re.match(r'^[\d.\-]+$', cell_normalized): is_format3_5 = True logger.info(f"[工况信息格式3/5] 检测到格式3/5特征:第{row_idx}行列{col_idx}包含时间段 '{cell_normalized}'") break if is_format3_5: break if not is_format3_5: logger.warning("[工况信息格式3/5] 未检测到格式3/5特征(电压列包含时间段),跳过该表格") continue # 确定列索引 # 根据表格结构:名称、时间(可能有colspan=2)、电压(kV)[colspan=2]、电流(A)[colspan=2]、有功(MW)[colspan=2]、无功(Mvar)[colspan=2] # 第二行表头:最大值、最小值(重复4次) # 格式3:名称、时间、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值... # 格式5:名称、时间(colspan=2,占用2列)、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值... # 动态检测时间列是否有colspan=2(通过检查表头和数据行的列数) # 格式3和格式5的时间列都有colspan=2,但实际数据行可能都是11列 # 检查表头第一行,看时间列是否有colspan=2 has_time_colspan2 = False if len(table) > 0: first_row = table[0] # 检查表头中是否有"时间"列,并且该列有colspan=2 # 如果第一行包含"时间"且后续列数较多,可能是colspan=2 header_text = " ".join(first_row).lower() if "时间" in header_text: # 检查表头结构:如果时间列后面直接是电压列,可能是colspan=2 # 或者检查数据行的列数 if len(table) > 2: for row_idx in range(2, min(5, len(table))): row = table[row_idx] logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}") if len(row) >= 11: # 格式3/5:名称(1) + 时间(2) + 电压时间段(1) + 电压max(1) + 电压min(1) + 电流max(1) + 电流min(1) + 有功max(1) + 有功min(1) + 无功max(1) + 无功min(1) = 11列 has_time_colspan2 = True logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2,数据行有{len(row)}列") break elif len(row) >= 10: # 可能是格式3,但时间列也可能有colspan=2(只是数据值只占1列) # 检查列2是否是时间段(包含"昼间"、"夜间"等) if len(row) > 2 and any(k in row[2] for k in ["昼间", "夜间", "次日", "~", ":"]): has_time_colspan2 = True logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2(格式3),数据行有{len(row)}列,列2是时间段") break logger.debug(f"[工况信息格式3/5] 检测到格式3,数据行有{len(row)}列") break name_idx = 0 # 无论格式3还是格式5,时间段都在列2(索引2),因为时间列虽然有colspan=2, # 但实际数据中时间段是电压列的第一列,在索引2 time_idx = 1 voltage_time_idx = 2 # 电压时间段列(格式3和格式5都在索引2) voltage_max_idx = 3 # 电压最大值列 voltage_min_idx = 4 # 电压最小值列 current_max_idx = 5 # 电流最大值列 current_min_idx = 6 # 电流最小值列 active_power_max_idx = 7 # 有功最大值列 active_power_min_idx = 8 # 有功最小值列 reactive_power_max_idx = 9 # 无功最大值列 reactive_power_min_idx = 10 # 无功最小值列 # 从第三行开始解析数据(前两行是表头) current_name = "" current_time = "" logger.info(f"[工况信息格式3/5] 开始解析数据行,表格总行数: {len(table)}, 从第3行(索引2)开始") for row_idx in range(2, len(table)): row = table[row_idx] logger.debug(f"[工况信息格式3/5] 处理第{row_idx}行,列数: {len(row)}, 前5列: {row[:5]}") # 至少需要4列 if len(row) < 4: logger.debug(f"[工况信息格式3/5] 第{row_idx}行列数不足4列,跳过") continue # 检查是否是表头行(包含"名称"、"时间"、"最大值"、"最小值"等关键词) if any(keyword in " ".join(row[:5]).lower() for keyword in ["名称", "时间", "最大值", "最小值", "电压", "电流", "有功", "无功"]): logger.debug(f"[工况信息格式3/5] 跳过表头行: {row[:3]}") continue # 检查是否是项目名称行(整行合并,如"蕲昌220kV变电站"、"输电线路") # 项目名称行通常只有第0列有值,其他列都为空(因为colspan) non_empty_cols = [i for i, cell in enumerate(row) if cell.strip()] if len(non_empty_cols) == 1 and non_empty_cols[0] == 0: # 检查内容是否是项目名称(不包含"主变"、"#"、"线"等设备名称关键词) cell_value = row[0].strip() if not any(k in cell_value for k in ["主变", "#", "线"]): # 可能是项目名称行,跳过 logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {cell_value}") continue # 更新名称(如果有值) if name_idx < len(row) and row[name_idx].strip(): name_value = row[name_idx].strip() # 检查是否是有效名称(包含"主变"、"#"等,但排除"输电线路"这样的项目名称) if name_value in ["输电线路", "变电站"]: # 这是项目名称行,跳过 logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {name_value}") continue elif any(k in name_value for k in ["主变", "#"]) or ("kV" in name_value and "线" in name_value): current_name = name_value logger.debug(f"[工况信息格式3/5] 更新名称: {current_name}") # 更新时间(如果有值) # 时间列可能有colspan=2,所以检查列1和列2 time_value = "" if time_idx < len(row) and row[time_idx].strip(): time_value = row[time_idx].strip() elif time_idx + 1 < len(row) and row[time_idx + 1].strip(): time_value = row[time_idx + 1].strip() # 检查是否是日期格式(支持"2025.03.28"和"2025.08.29-08.30") if time_value and (re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}', time_value) or re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}-\d{1,2}\.\d{1,2}', time_value)): current_time = time_value logger.debug(f"[工况信息格式3/5] 更新时间: {current_time}") # 检查是否有电压时间段(格式3/5的特征) if voltage_time_idx < len(row) and row[voltage_time_idx].strip(): voltage_time = row[voltage_time_idx].strip() # 检查是否是时间段(包含"昼间"、"夜间"等) if any(k in voltage_time for k in ["昼间", "夜间", "次日", "~", ":"]): # 创建工况信息记录(使用OperationalConditionV2格式) oc = OperationalConditionV2() oc.project = "" oc.name = current_name # 组合monitorAt:时间 + 时间段(保持原始格式,如"2025.03.28 昼间9:00~11:00") if current_time: oc.monitorAt = f"{current_time} {voltage_time}".strip() else: oc.monitorAt = voltage_time # 电压最大值 if voltage_max_idx < len(row) and row[voltage_max_idx].strip(): oc.maxVoltage = row[voltage_max_idx].strip() # 电压最小值 if voltage_min_idx < len(row) and row[voltage_min_idx].strip(): oc.minVoltage = row[voltage_min_idx].strip() # 电流最大值 if current_max_idx < len(row) and row[current_max_idx].strip(): oc.maxCurrent = row[current_max_idx].strip() # 电流最小值 if current_min_idx < len(row) and row[current_min_idx].strip(): oc.minCurrent = row[current_min_idx].strip() # 有功功率最大值 if active_power_max_idx < len(row) and row[active_power_max_idx].strip(): oc.maxActivePower = row[active_power_max_idx].strip() # 有功功率最小值 if active_power_min_idx < len(row) and row[active_power_min_idx].strip(): oc.minActivePower = row[active_power_min_idx].strip() # 无功功率最大值 if reactive_power_max_idx < len(row) and row[reactive_power_max_idx].strip(): oc.maxReactivePower = row[reactive_power_max_idx].strip() # 无功功率最小值 if reactive_power_min_idx < len(row) and row[reactive_power_min_idx].strip(): oc.minReactivePower = row[reactive_power_min_idx].strip() # 添加记录(只要名称不为空) if oc.name: conditions.append(oc) logger.info(f"[工况信息格式3/5] 解析到第{len(conditions)}条记录: name='{oc.name}', monitorAt='{oc.monitorAt}'") # 只处理第一个匹配的表格 if conditions: break logger.info(f"[工况信息格式3/5] 共解析到 {len(conditions)} 条工况信息") return conditions