| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985 |
- # Copyright (c) Opendatalab. All rights reserved.
- """
- 表格解析模块 v2 - 独立版本,不依赖v1
- """
- from typing import List
- import re
- from ..utils.logging_config import get_logger
- from ..models.data_models import OperationalCondition, OperationalConditionV2
- logger = get_logger("pdf_converter_v2.parser.table")
- def normalize_text(text: str) -> str:
- """将常见全角符号、大小写等统一,便于关键词匹配"""
- if not text:
- return ""
- text = text.lower()
- replacements = {
- "(": "(",
- ")": ")",
- ":": ":",
- "-": "-",
- "—": "-",
- "〜": "~",
- "~": "~",
- "/": "/",
- " ": " ",
- }
- for old, new in replacements.items():
- text = text.replace(old, new)
- return text
- def parse_table_cell(cell_content: str) -> str:
- """解析表格单元格内容"""
- if not cell_content:
- return ""
- cell_content = re.sub(r'<[^>]+>', '', cell_content)
- cell_content = re.sub(r'\s+', ' ', cell_content).strip()
- return cell_content
- def extract_table_data(markdown_content: str) -> List[List[List[str]]]:
- """从Markdown内容中提取表格数据"""
- tables: List[List[List[str]]] = []
- # 匹配带属性的table标签,如 <table border=1 style='...'>
- table_matches = re.findall(r'<table[^>]*>(.*?)</table>', markdown_content, re.DOTALL)
- logger.debug(f"[extract_table_data] 共找到 {len(table_matches)} 个表格")
-
- for table_idx, table_content in enumerate(table_matches):
- table_rows: List[List[str]] = []
- tr_matches = re.findall(r'<tr[^>]*>(.*?)</tr>', table_content, re.DOTALL)
- logger.debug(f"[extract_table_data] 表格{table_idx}, 行数: {len(tr_matches)}")
-
- for row_idx, tr_content in enumerate(tr_matches):
- td_matches = re.findall(r'<td[^>]*>(.*?)</td>', tr_content)
- row: List[str] = [parse_table_cell(td) for td in td_matches]
- if row:
- table_rows.append(row)
-
- if table_rows:
- tables.append(table_rows)
-
- logger.debug(f"[extract_table_data] 总表格: {len(tables)}")
- return tables
- def extract_table_with_rowspan_colspan(markdown_content: str) -> List[List[List[str]]]:
- """提取表格数据,处理rowspan和colspan属性"""
- tables: List[List[List[str]]] = []
- # 匹配带属性的table标签,如 <table border=1 style='...'>
- table_matches = re.findall(r'<table[^>]*>(.*?)</table>', markdown_content, re.DOTALL)
- logger.debug(f"[extract_table_with_rowspan_colspan] 共找到 {len(table_matches)} 个表格")
-
- for table_idx, table_content in enumerate(table_matches):
- tr_matches = re.findall(r'<tr[^>]*>(.*?)</tr>', table_content, re.DOTALL)
- logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx}, 行数: {len(tr_matches)}")
-
- if not tr_matches:
- continue
-
- # 用于存储rowspan的值(跨行的单元格值)
- rowspan_values = {} # {(row_idx, col_idx): (value, remaining_rows)}
-
- # 先构建一个矩阵来存储所有单元格
- max_cols = 0
- table_matrix = []
-
- for row_idx, tr_content in enumerate(tr_matches):
- # 找到所有td标签,包括属性
- td_pattern = r'<td[^>]*>(.*?)</td>'
- td_matches_with_attrs = re.finditer(td_pattern, tr_content, re.DOTALL)
-
- row = []
- col_idx = 0
-
- for td_match in td_matches_with_attrs:
- full_td = td_match.group(0)
- cell_content = td_match.group(1)
-
- # 提取rowspan和colspan属性
- rowspan_match = re.search(r'rowspan=["\']?(\d+)["\']?', full_td)
- colspan_match = re.search(r'colspan=["\']?(\d+)["\']?', full_td)
-
- rowspan = int(rowspan_match.group(1)) if rowspan_match else 1
- colspan = int(colspan_match.group(1)) if colspan_match else 1
-
- # 解析单元格内容
- cell_text = parse_table_cell(cell_content)
-
- # 跳过被rowspan占用的列
- while (row_idx, col_idx) in rowspan_values:
- row.append(rowspan_values[(row_idx, col_idx)][0]) # 使用rowspan的值
- remaining = rowspan_values[(row_idx, col_idx)][1] - 1
- if remaining > 0:
- rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining)
- del rowspan_values[(row_idx, col_idx)]
- col_idx += 1
-
- # 添加单元格内容
- for c in range(colspan):
- row.append(cell_text if c == 0 else "")
-
- # 如果有rowspan,记录到后续行
- if rowspan > 1 and c == 0:
- rowspan_values[(row_idx + 1, col_idx)] = (cell_text, rowspan - 1)
-
- col_idx += 1
-
- # 处理剩余的被rowspan占用的列
- while (row_idx, col_idx) in rowspan_values:
- row.append(rowspan_values[(row_idx, col_idx)][0])
- remaining = rowspan_values[(row_idx, col_idx)][1] - 1
- if remaining > 0:
- rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining)
- del rowspan_values[(row_idx, col_idx)]
- col_idx += 1
-
- if row:
- table_matrix.append(row)
- max_cols = max(max_cols, len(row))
- logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx} 第{row_idx}行, 内容: {row}")
-
- # 统一列数(可选,确保每行列数一致)
- for row in table_matrix:
- while len(row) < max_cols:
- row.append("")
-
- if table_matrix:
- tables.append(table_matrix)
-
- logger.debug(f"[extract_table_with_rowspan_colspan] 总表格: {len(tables)}")
- return tables
- def parse_operational_conditions(markdown_content: str, require_title: bool = True) -> List[OperationalCondition]:
- """解析工况信息表格
-
- Args:
- markdown_content: Markdown内容
- require_title: 是否要求必须有标题标识(如"附件2 工况信息"),默认为True
- 如果为False,则仅根据表格结构判断是否为工况信息表格
- """
- conditions: List[OperationalCondition] = []
-
- # 查找工况信息相关的表格
- if require_title:
- if "附件2 工况信息" not in markdown_content and "工况信息" not in markdown_content:
- logger.debug("[工况信息] 未找到工况信息标识")
- return conditions
- else:
- logger.debug("[工况信息] 无标题模式:仅根据表格结构判断")
-
- # 提取表格数据(支持rowspan和colspan)
- tables = extract_table_with_rowspan_colspan(markdown_content)
-
- if not tables:
- logger.warning("[工况信息] 未能提取出任何表格内容")
- return conditions
-
- # 查找工况信息表格(通常包含"检测时间"/"时间"、"电压"/"U"、"电流"/"I"等关键词;支持两行表头)
- for table in tables:
- if not table or len(table) < 2:
- continue
-
- header_row = table[0]
- header_text = " ".join(header_row)
- # 第一行表头:检测时间/电压/电流/项目 或 名称/时间/运行工况
- has_row0 = any(
- k in header_text for k in ["检测时间", "电压", "电流", "有功功率", "无功功率", "项目", "时间", "名称", "运行工况"]
- )
- # 两行表头时第二行常有 U(kV)、I(A)、P(MW)、Q(Mvar)
- header_row2 = table[1] if len(table) > 1 else []
- header2_text = " ".join(header_row2).lower()
- has_row1 = any(
- k in header2_text for k in ["u(", "i(", "p(", "q(", "电压", "电流", "有功", "无功", "kv", "mw", "mvar"]
- )
- has_operational_keywords = has_row0 or (len(header_row2) >= 4 and has_row1)
- if not has_operational_keywords:
- continue
-
- logger.info(f"[工况信息] 找到工况信息表格,行数: {len(table)}")
-
- # 列索引:优先用第二行表头(U/I/P/Q),否则用第一行
- monitor_at_idx = -1
- project_idx = -1
- name_idx = -1
- voltage_idx = -1
- current_idx = -1
- active_power_idx = -1
- reactive_power_idx = -1
-
- # 若第二行表头存在且含 U/I/P/Q,用其确定电压/电流/功率列
- if len(table) > 1 and has_row1:
- row2 = table[1]
- for idx, cell in enumerate(row2):
- cell_n = normalize_text(cell)
- if "u" in cell_n and ("kv" in cell_n or "k v" in cell_n or "电压" in cell):
- voltage_idx = idx
- elif "i" in cell_n and ("a)" in cell_n or "a )" in cell_n or "电流" in cell):
- current_idx = idx
- elif "p" in cell_n and ("mw" in cell_n or "m w" in cell_n or "有功" in cell):
- active_power_idx = idx
- elif "q" in cell_n and ("mvar" in cell_n or "无功" in cell):
- reactive_power_idx = idx
- elif ("时间" in cell or "检测时间" in cell or "监测时间" in cell) and monitor_at_idx == -1:
- monitor_at_idx = idx
- elif ("名称" in cell or "主变" in cell) and name_idx == -1:
- name_idx = idx
-
- # 用第一行表头补全未识别的列(名称、时间、项目等)
- for idx, cell in enumerate(header_row):
- cell_lower = cell.lower()
- if ("检测时间" in cell or "监测时间" in cell or "时间" in cell) and monitor_at_idx == -1:
- monitor_at_idx = idx
- elif "项目" in cell:
- if project_idx == -1:
- project_idx = idx
- if idx + 1 < len(header_row) and name_idx == -1:
- next_cell = header_row[idx + 1]
- if not any(k in next_cell.lower() for k in ["电压", "电流", "有功", "无功", "检测"]):
- name_idx = idx + 1
- elif "电压" in cell or "电压(kv)" in cell_lower and voltage_idx == -1:
- voltage_idx = idx
- elif "电流" in cell or "电流(a)" in cell_lower and current_idx == -1:
- current_idx = idx
- elif ("有功功率" in cell or ("有功" in cell and "功率" in cell)) and active_power_idx == -1:
- active_power_idx = idx
- elif ("无功功率" in cell or ("无功" in cell and "功率" in cell)) and reactive_power_idx == -1:
- reactive_power_idx = idx
- elif ("名称" in cell or "主变" in cell) and name_idx == -1:
- name_idx = idx
-
- # 默认名称列0、时间列1(常见两行表头:名称、时间、运行工况 | 名称、时间、U、I、P、Q)
- if name_idx == -1 and len(header_row) > 0 and ("名称" in header_row[0] or not any("名称" in c for c in header_row2)):
- name_idx = 0
- if monitor_at_idx == -1 and len(header_row) > 1 and ("时间" in header_row[1] or (len(header_row2) > 1 and "时间" in header_row2[1])):
- monitor_at_idx = 1
-
- logger.debug(f"[工况信息] 列索引: 检测时间={monitor_at_idx}, 项目={project_idx}, 名称={name_idx}, "
- f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}")
-
- # 数据行:两行表头时从第3行(索引2)开始,否则从第2行(索引1)开始
- data_start = 2 if (len(table) > 1 and has_row1) else 1
- current_monitor_at = ""
- current_project = ""
-
- for row_idx in range(data_start, len(table)):
- row = table[row_idx]
- if len(row) < 4: # 至少需要检测时间、项目、名称等基本字段
- continue
-
- # 检测时间
- if monitor_at_idx >= 0 and monitor_at_idx < len(row) and row[monitor_at_idx].strip():
- current_monitor_at = row[monitor_at_idx].strip()
-
- # 项目名称
- if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip():
- current_project = row[project_idx].strip()
-
- # 名称(如1#主变)
- name_value = ""
- if name_idx >= 0 and name_idx < len(row):
- name_value = row[name_idx].strip()
- elif project_idx >= 0 and project_idx + 1 < len(row):
- # 如果名称列在项目列后面
- name_value = row[project_idx + 1].strip()
-
- # 只有当名称存在时才创建工况信息记录(因为有rowspan的情况)
- if name_value and any(k in name_value for k in ["主变", "#"]):
- oc = OperationalCondition()
- oc.monitorAt = current_monitor_at
- oc.project = current_project
- oc.name = name_value
-
- # 电压
- if voltage_idx >= 0 and voltage_idx < len(row):
- oc.voltage = row[voltage_idx].strip()
-
- # 电流
- if current_idx >= 0 and current_idx < len(row):
- oc.current = row[current_idx].strip()
-
- # 有功功率
- if active_power_idx >= 0 and active_power_idx < len(row):
- oc.activePower = row[active_power_idx].strip()
-
- # 无功功率
- if reactive_power_idx >= 0 and reactive_power_idx < len(row):
- oc.reactivePower = row[reactive_power_idx].strip()
-
- conditions.append(oc)
- logger.debug(f"[工况信息] 解析到: {oc.to_dict()}")
-
- # 只处理第一个匹配的表格
- if conditions:
- break
-
- logger.info(f"[工况信息] 共解析到 {len(conditions)} 条工况信息")
- return conditions
- def parse_operational_conditions_v2(markdown_content: str) -> List[OperationalConditionV2]:
- """解析工况信息表格(新格式:表1检测工况)
-
- 表格结构:
- - 第一行:名称、时间,电压(kV)(colspan=2),电流(A)(colspan=2),有功(MW)(colspan=2),无功(Mvar)(colspan=2)
- - 第二行:最大值、最小值(重复4次)
- - 数据行:名称、时间、电压最大值、电压最小值、电流最大值、电流最小值、有功最大值、有功最小值、无功最大值、无功最小值
- """
- conditions: List[OperationalConditionV2] = []
-
- # 检查是否包含"表1检测工况"标识(使用正则表达式,允许中间有空格)
- # 支持:表1检测工况、表 1 检测工况、表 1检测工况、表1 检测工况 等变体
- pattern = r'表\s*1\s*检测工况'
- if not re.search(pattern, markdown_content):
- logger.debug("[工况信息V2] 未找到'表1检测工况'标识(包括空格变体)")
- return conditions
-
- logger.debug("[工况信息V2] 检测到'表1检测工况'格式(包括空格变体),第一列将映射到name字段,project字段保持为空")
-
- # 提取表格数据(支持rowspan和colspan)
- tables = extract_table_with_rowspan_colspan(markdown_content)
-
- if not tables:
- logger.warning("[工况信息V2] 未能提取出任何表格内容")
- return conditions
-
- # 查找包含"表1检测工况"的表格
- # 表格结构:第一行是名称、时间,然后是电压、电流、有功、无功(各占2列)
- for table in tables:
- if not table or len(table) < 3: # 至少需要表头2行和数据1行
- continue
-
- # 检查第一行表头是否包含"名称"、"时间"、"电压"等关键词
- first_row = table[0]
- first_row_text = " ".join(first_row).lower()
- has_keywords = any(k in first_row_text for k in ["名称", "时间", "电压", "电流", "有功", "无功"])
-
- if not has_keywords:
- continue
-
- logger.info(f"[工况信息V2] 找到工况信息表格,行数: {len(table)}")
-
- # 列索引映射(根据表格结构)
- # 列0: 项目名称(映射到name字段)
- # 列1: 时间
- # 列2: 电压最大值
- # 列3: 电压最小值
- # 列4: 电流最大值
- # 列5: 电流最小值
- # 列6: 有功最大值
- # 列7: 有功最小值
- # 列8: 无功最大值
- # 列9: 无功最小值
- # 注意:对于"表1检测工况"格式,第一列映射到name字段,project字段保持为空
-
- name_idx = 0 # 第一列是项目名称(如"500kV 江黄Ⅰ线")
- time_idx = 1
- voltage_max_idx = 2
- voltage_min_idx = 3
- current_max_idx = 4
- current_min_idx = 5
- active_power_max_idx = 6
- active_power_min_idx = 7
- reactive_power_max_idx = 8
- reactive_power_min_idx = 9
-
- # 从第三行开始解析数据(前两行是表头)
- logger.debug(f"[工况信息V2] 表格总行数: {len(table)}, 开始从第3行(索引2)解析数据行")
- for row_idx in range(2, len(table)):
- row = table[row_idx]
- logger.debug(f"[工况信息V2] 处理第{row_idx}行(索引{row_idx}): 列数={len(row)}, 内容={row[:5]}...") # 只打印前5列用于调试
-
- # 至少需要10列(名称、时间、电压max/min、电流max/min、有功max/min、无功max/min)
- if len(row) < 10:
- logger.warning(f"[工况信息V2] 第{row_idx}行列数不足10列,跳过: {len(row)}列")
- continue
-
- # 检查是否是数据行(第一列应该有项目名称,且不是"名称"、"最大值"、"最小值"等表头关键词)
- first_cell = row[name_idx].strip() if name_idx < len(row) else ""
- if not first_cell or first_cell in ["名称", "最大值", "最小值", "时间"]:
- logger.debug(f"[工况信息V2] 第{row_idx}行第一列为表头关键词或为空,跳过: '{first_cell}'")
- continue
-
- # 跳过完全空的行(但允许某些字段为空)
- if not any(cell.strip() for cell in row[:2]): # 至少名称或时间应该有值
- logger.debug(f"[工况信息V2] 第{row_idx}行前两列为空,跳过")
- continue
-
- oc = OperationalConditionV2()
-
- # 名称(列0,映射到name字段)
- if name_idx < len(row):
- oc.name = row[name_idx].strip()
-
- # project字段保持为空(仅在检测工况之外的场景使用)
- oc.project = ""
-
- # 时间(列1)
- if time_idx < len(row):
- oc.monitorAt = row[time_idx].strip()
-
- # 电压最大值(列2)
- if voltage_max_idx < len(row):
- oc.maxVoltage = row[voltage_max_idx].strip()
-
- # 电压最小值(列3)
- if voltage_min_idx < len(row):
- oc.minVoltage = row[voltage_min_idx].strip()
-
- # 电流最大值(列4)
- if current_max_idx < len(row):
- oc.maxCurrent = row[current_max_idx].strip()
-
- # 电流最小值(列5)
- if current_min_idx < len(row):
- oc.minCurrent = row[current_min_idx].strip()
-
- # 有功功率最大值(列6)
- if active_power_max_idx < len(row):
- oc.maxActivePower = row[active_power_max_idx].strip()
-
- # 有功功率最小值(列7)
- if active_power_min_idx < len(row):
- oc.minActivePower = row[active_power_min_idx].strip()
-
- # 无功功率最大值(列8)
- if reactive_power_max_idx < len(row):
- oc.maxReactivePower = row[reactive_power_max_idx].strip()
-
- # 无功功率最小值(列9)
- if reactive_power_min_idx < len(row):
- oc.minReactivePower = row[reactive_power_min_idx].strip()
-
- # 添加记录(只要名称不为空)
- if oc.name:
- conditions.append(oc)
- logger.info(f"[工况信息V2] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}'")
- else:
- logger.warning(f"[工况信息V2] 第{row_idx}行名称为空,跳过该行: {row[:3]}")
-
- # 只处理第一个匹配的表格
- if conditions:
- break
-
- logger.info(f"[工况信息V2] 共解析到 {len(conditions)} 条工况信息")
- return conditions
- def parse_operational_conditions_opstatus(markdown_content: str) -> List[OperationalCondition]:
- """解析工况信息表格(opStatus格式:附件 工况及工程信息)
-
- 表格结构:
- - 第一行:名称(rowspan=2)、时间(rowspan=2)、运行工况(colspan=4)
- - 第二行:U (kV)、I (A)、P (MW)、Q (Mvar)
- - 数据行:名称、时间(可能有rowspan)、U范围、I范围、P范围、Q范围
-
- 输出格式:
- [
- {
- "monitorAt": "", // 检测时间
- "project": "", // 项目名称(从"项目编号"提取,如果存在)
- "name": "", // 名称,如#1主变
- "voltage": "", // 电压范围
- "current": "", // 电流范围
- "activePower": "", // 有功功率范围
- "reactivePower": "", // 无功功率范围
- }
- ]
- """
- conditions: List[OperationalCondition] = []
-
- # 检查是否包含"附件 工况及工程信息"标识
- if "附件" not in markdown_content or "工况" not in markdown_content:
- logger.debug("[工况信息opStatus] 未找到'附件 工况及工程信息'标识")
- return conditions
-
- # 提取项目编号(如果存在)
- # 需要排除表格内容,只匹配表格之前的内容
- project_number = ""
- # 先找到表格开始位置
- table_start = markdown_content.find('<table>')
- if table_start > 0:
- # 只在表格之前的内容中查找项目编号
- content_before_table = markdown_content[:table_start]
- project_match = re.search(r'项目编号[::]\s*([^\n<]+)', content_before_table)
- if project_match:
- project_number = project_match.group(1).strip()
- logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}")
- else:
- # 如果没有表格,在整个内容中查找
- project_match = re.search(r'项目编号[::]\s*([^\n<]+)', markdown_content)
- if project_match:
- project_number = project_match.group(1).strip()
- logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}")
-
- # 提取表格数据(支持rowspan和colspan)
- tables = extract_table_with_rowspan_colspan(markdown_content)
-
- if not tables:
- logger.warning("[工况信息opStatus] 未能提取出任何表格内容")
- return conditions
-
- # 查找工况信息表格(包含"名称"、"时间"、"U"、"I"、"P"、"Q"等关键词)
- for table in tables:
- if not table or len(table) < 3: # 至少需要表头2行和数据1行
- continue
-
- # 检查表头是否包含工况信息的关键词
- first_row = table[0]
- second_row = table[1] if len(table) > 1 else []
- header_text = normalize_text(" ".join(first_row + second_row))
-
- has_keywords = any(
- keyword in header_text
- for keyword in ["名称", "时间", "运行工况", "u", "i", "p", "q", "kv", "mw", "mvar"]
- )
-
- if not has_keywords:
- continue
-
- logger.info(f"[工况信息opStatus] 找到工况信息表格,行数: {len(table)}")
-
- # 检测是否有两行表头(第二行包含"U (kV)"、"I (A)"等)
- has_two_row_header = False
- if len(table) > 1:
- second_row_text = normalize_text(" ".join(table[1]))
- # 兼容带空格和不带空格的写法,例如 "U (kV)" / "U(kV)"
- if (
- any(k in second_row_text for k in ["u(kv)", "i(a)", "p(mw)", "q(mvar)"])
- or ("u" in second_row_text and "kv" in second_row_text)
- or ("i" in second_row_text and "a" in second_row_text)
- or ("p" in second_row_text and "mw" in second_row_text)
- or ("q" in second_row_text and "mvar" in second_row_text)
- ):
- has_two_row_header = True
- logger.debug("[工况信息opStatus] 检测到两行表头格式")
-
- # 根据表头动态确定列索引
- # 如果有两行表头,从第二行检测;否则从第一行检测
- header_row = table[1] if has_two_row_header else table[0]
- time_idx = -1
- project_idx = -1
- name_idx = -1
- voltage_idx = -1
- current_idx = -1
- active_power_idx = -1
- reactive_power_idx = -1
-
- for idx, cell in enumerate(header_row):
- cell_lower = cell.lower()
- cell_normalized = normalize_text(cell)
- if "检测时间" in cell or "监测时间" in cell or "时间" in cell:
- time_idx = idx
- elif "项目" in cell:
- project_idx = idx
- elif "名称" in cell:
- name_idx = idx
- elif "电压" in cell or ("u" in cell_normalized and "kv" in cell_normalized) or ("u (kv)" in cell_normalized):
- voltage_idx = idx
- elif "电流" in cell or ("i" in cell_normalized and "a" in cell_normalized) or ("i (a)" in cell_normalized):
- current_idx = idx
- elif "有功功率" in cell or ("有功" in cell and "功率" in cell) or ("p" in cell_normalized and "mw" in cell_normalized) or ("p (mw)" in cell_normalized):
- active_power_idx = idx
- elif "无功功率" in cell or ("无功" in cell and "功率" in cell) or ("q" in cell_normalized and "mvar" in cell_normalized) or ("q (mvar)" in cell_normalized):
- reactive_power_idx = idx
-
- # 如果第一行表头有"名称",也检查第一行
- if has_two_row_header and name_idx == -1:
- first_row = table[0]
- for idx, cell in enumerate(first_row):
- if "名称" in cell:
- name_idx = idx
- break
-
- # 如果表头中没有找到名称列,尝试在数据行中查找
- if name_idx == -1:
- # 确定第一行数据的位置(如果有两行表头,从第2行开始;否则从第1行开始)
- first_data_row_idx = 2 if has_two_row_header else 1
- if len(table) > first_data_row_idx:
- first_data_row = table[first_data_row_idx]
- for idx, cell in enumerate(first_data_row):
- # 跳过已知的列(时间列和项目列)
- if idx != time_idx and idx != project_idx and cell.strip():
- # 支持主变格式(包含"主变"或"#")和输电线路格式(包含"kV"和"线")
- if (any(k in cell for k in ["主变", "#"]) or
- ("kV" in cell and "线" in cell) or
- ("kV" in cell)):
- name_idx = idx
- logger.debug(f"[工况信息opStatus] 从数据行推断名称列索引: {name_idx}, 内容='{cell}'")
- break
-
- # 如果仍然没有找到名称列,但找到了项目列,则名称列通常在项目列之后
- # 如果项目列有colspan,名称列可能在空列之后
- if name_idx == -1 and project_idx >= 0:
- # 检查项目列之后是否有空列,如果有,名称列在空列位置(因为colspan会创建空列)
- if project_idx + 1 < len(header_row):
- if not header_row[project_idx + 1].strip():
- # 项目列有colspan,名称列在空列位置(project_idx + 1)
- name_idx = project_idx + 1
- else:
- # 名称列紧跟在项目列之后
- name_idx = project_idx + 1
- else:
- # 如果项目列是最后一列,名称列可能在项目列之后
- name_idx = project_idx + 1 if project_idx + 1 < len(header_row) else -1
-
- logger.debug(f"[工况信息opStatus] 列索引: 检测时间={time_idx}, 项目={project_idx}, 名称={name_idx}, "
- f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}")
-
- # 确定数据行起始位置(如果有两行表头,从第2行开始;否则从第1行开始)
- data_start_row = 2 if has_two_row_header else 1
- current_monitor_at = ""
- current_project = ""
-
- for row_idx in range(data_start_row, len(table)):
- row = table[row_idx]
- logger.debug(f"[工况信息opStatus] 处理第{row_idx}行: 列数={len(row)}, 内容={row}")
-
- # 至少需要3列
- if len(row) < 3:
- logger.warning(f"[工况信息opStatus] 第{row_idx}行列数不足,跳过: {len(row)}列")
- continue
-
- # 更新检测时间(如果有值且列索引有效)
- if time_idx >= 0 and time_idx < len(row) and row[time_idx].strip():
- current_monitor_at = row[time_idx].strip()
-
- # 更新项目/位置(如果有值且列索引有效)
- if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip():
- current_project = row[project_idx].strip()
-
- # 获取名称(变压器名称,如"1#主变")
- name_value = ""
- if name_idx >= 0 and name_idx < len(row):
- name_value = row[name_idx].strip()
- elif project_idx >= 0 and project_idx + 1 < len(row):
- # 如果名称列索引未找到,尝试项目列之后的第一列
- name_value = row[project_idx + 1].strip()
-
- # 检查是否是噪声数据行(包含测点编号如N1、N2等,且没有电压/电流/功率列)
- is_noise_row = False
- # 检查名称列或第一列是否是测点编号
- check_cell = name_value if name_value else (row[0].strip() if len(row) > 0 else "")
- if check_cell:
- # 如果名称列或第一列包含测点编号格式(N1、N2等)且没有找到电压/电流/功率列,可能是噪声数据行
- if re.match(r'^N\d+', check_cell) and voltage_idx == -1 and current_idx == -1:
- is_noise_row = True
- logger.debug(f"[工况信息opStatus] 第{row_idx}行疑似噪声数据行,跳过: 第一列或名称列='{check_cell}'")
-
- # 只有当名称存在且不是噪声数据行时才创建工况信息记录
- # 放宽名称验证:支持主变(包含"主变"或"#")和输电线路(包含"kV"和"线")
- is_valid_name = False
- if name_value:
- # 主变格式:包含"主变"或"#"
- if any(k in name_value for k in ["主变", "#"]):
- is_valid_name = True
- # 输电线路格式:包含"kV"和"线"
- elif "kV" in name_value and "线" in name_value:
- is_valid_name = True
- # 其他可能的格式:包含"kV"(可能是其他设备)
- elif "kV" in name_value:
- is_valid_name = True
-
- if is_valid_name and not is_noise_row:
- # 进一步验证:必须有电压或电流或功率列,否则可能是噪声数据行
- if voltage_idx == -1 and current_idx == -1 and active_power_idx == -1 and reactive_power_idx == -1:
- logger.debug(f"[工况信息opStatus] 第{row_idx}行没有找到电压/电流/功率列,跳过: name='{name_value}'")
- continue
-
- # 创建工况信息记录
- oc = OperationalCondition()
- oc.monitorAt = current_monitor_at
- oc.project = current_project # project字段使用表格中"项目"列的值
- oc.name = name_value
-
- # 电压范围
- if voltage_idx >= 0 and voltage_idx < len(row):
- oc.voltage = row[voltage_idx].strip()
-
- # 电流范围
- if current_idx >= 0 and current_idx < len(row):
- oc.current = row[current_idx].strip()
-
- # 有功功率范围
- if active_power_idx >= 0 and active_power_idx < len(row):
- oc.activePower = row[active_power_idx].strip()
-
- # 无功功率范围
- if reactive_power_idx >= 0 and reactive_power_idx < len(row):
- oc.reactivePower = row[reactive_power_idx].strip()
-
- # 添加记录
- conditions.append(oc)
- logger.info(f"[工况信息opStatus] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}', 项目='{oc.project}'")
- else:
- if is_noise_row:
- logger.debug(f"[工况信息opStatus] 第{row_idx}行是噪声数据行,跳过: name='{name_value}'")
- else:
- logger.debug(f"[工况信息opStatus] 第{row_idx}行名称无效或为空,跳过: name='{name_value}'")
-
- # 只处理第一个匹配的表格
- if conditions:
- break
-
- logger.info(f"[工况信息opStatus] 共解析到 {len(conditions)} 条工况信息")
- return conditions
- def parse_operational_conditions_format3_5(markdown_content: str) -> List[OperationalConditionV2]:
- """解析工况信息表格(格式3和格式5:附件 2 工况信息,电压列第一列存储时间段)
-
- 表格结构:
- - 第一行:名称(rowspan=2)、时间(rowspan=2或colspan=2)、电压(kV)(colspan=2)、电流(A)(colspan=2)、有功(MW)(colspan=2)、无功(Mvar)(colspan=2)
- - 第二行:最大值、最小值(重复4次)
- - 数据行特点:
- - 电压列的第一列存储时间段(如"昼间9:00~11:00"),第二列是电压最大值
- - 电流列的第一列是电流最大值,第二列是电流最小值
- - 有功和无功类似
-
- 输出格式(使用OperationalConditionV2):
- [
- {
- "monitorAt": "", // 检测时间(从时间列和电压时间段列组合)
- "project": "", // 项目名称
- "name": "", // 名称,如#3主变
- "maxVoltage": "", // 电压最大值
- "minVoltage": "", // 电压最小值
- "maxCurrent": "", // 电流最大值
- "minCurrent": "", // 电流最小值
- "maxActivePower": "", // 有功功率最大值
- "minActivePower": "", // 有功功率最小值
- "maxReactivePower": "", // 无功功率最大值
- "minReactivePower": "", // 无功功率最小值
- }
- ]
- """
- conditions: List[OperationalConditionV2] = []
-
- # 检查是否包含"附件 2 工况信息"或"附件 2 工况及工程信息"标识
- if "附件" not in markdown_content or ("工况信息" not in markdown_content and "工况及工程信息" not in markdown_content):
- logger.debug("[工况信息格式3/5] 未找到'附件 2 工况信息'或'附件 2 工况及工程信息'标识")
- return conditions
-
- # 提取表格数据(支持rowspan和colspan)
- tables = extract_table_with_rowspan_colspan(markdown_content)
-
- if not tables:
- logger.warning("[工况信息格式3/5] 未能提取出任何表格内容")
- return conditions
-
- # 查找工况信息表格
- for table in tables:
- if not table or len(table) < 3: # 至少需要表头2行和数据1行
- continue
-
- # 检查表头是否包含工况信息的关键词
- first_row = table[0]
- second_row = table[1] if len(table) > 1 else []
- header_text = normalize_text(" ".join(first_row + second_row))
-
- has_keywords = any(
- keyword in header_text
- for keyword in ["名称", "时间", "电压", "电流", "有功", "无功", "kv", "mw", "mvar"]
- )
-
- if not has_keywords:
- continue
-
- logger.info(f"[工况信息格式3/5] 找到工况信息表格,行数: {len(table)}")
-
- # 检查是否是格式3/5(电压列第一列存储时间段)
- # 从数据行检查:如果电压列第一列包含"昼间"、"夜间"等时间段关键词,则是格式3/5
- is_format3_5 = False
- if len(table) > 2:
- logger.debug(f"[工况信息格式3/5] 开始检查格式3/5特征,表格行数: {len(table)}")
- for row_idx in range(2, min(5, len(table))): # 检查前几行数据
- row = table[row_idx]
- logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}, 内容: {row[:6]}")
- if len(row) >= 4: # 至少需要名称、时间、电压列
- # 电压列可能在索引2或3(格式3在列2,格式5在列3,因为时间列有colspan=2)
- # 扩大检查范围到列2-6,以覆盖格式3和格式5
- for col_idx in range(2, min(7, len(row))):
- cell = row[col_idx].strip()
- # 处理可能的换行符(如"昼间\n10:00~17:00")
- cell_normalized = cell.replace("\n", " ").replace("\r", " ").strip()
- if cell_normalized and any(k in cell_normalized for k in ["昼间", "夜间", "次日", "~", ":"]) and not re.match(r'^[\d.\-]+$', cell_normalized):
- is_format3_5 = True
- logger.info(f"[工况信息格式3/5] 检测到格式3/5特征:第{row_idx}行列{col_idx}包含时间段 '{cell_normalized}'")
- break
- if is_format3_5:
- break
-
- if not is_format3_5:
- logger.warning("[工况信息格式3/5] 未检测到格式3/5特征(电压列包含时间段),跳过该表格")
- continue
-
- # 确定列索引
- # 根据表格结构:名称、时间(可能有colspan=2)、电压(kV)[colspan=2]、电流(A)[colspan=2]、有功(MW)[colspan=2]、无功(Mvar)[colspan=2]
- # 第二行表头:最大值、最小值(重复4次)
- # 格式3:名称、时间、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值...
- # 格式5:名称、时间(colspan=2,占用2列)、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值...
-
- # 动态检测时间列是否有colspan=2(通过检查表头和数据行的列数)
- # 格式3和格式5的时间列都有colspan=2,但实际数据行可能都是11列
- # 检查表头第一行,看时间列是否有colspan=2
- has_time_colspan2 = False
- if len(table) > 0:
- first_row = table[0]
- # 检查表头中是否有"时间"列,并且该列有colspan=2
- # 如果第一行包含"时间"且后续列数较多,可能是colspan=2
- header_text = " ".join(first_row).lower()
- if "时间" in header_text:
- # 检查表头结构:如果时间列后面直接是电压列,可能是colspan=2
- # 或者检查数据行的列数
- if len(table) > 2:
- for row_idx in range(2, min(5, len(table))):
- row = table[row_idx]
- logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}")
- if len(row) >= 11: # 格式3/5:名称(1) + 时间(2) + 电压时间段(1) + 电压max(1) + 电压min(1) + 电流max(1) + 电流min(1) + 有功max(1) + 有功min(1) + 无功max(1) + 无功min(1) = 11列
- has_time_colspan2 = True
- logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2,数据行有{len(row)}列")
- break
- elif len(row) >= 10:
- # 可能是格式3,但时间列也可能有colspan=2(只是数据值只占1列)
- # 检查列2是否是时间段(包含"昼间"、"夜间"等)
- if len(row) > 2 and any(k in row[2] for k in ["昼间", "夜间", "次日", "~", ":"]):
- has_time_colspan2 = True
- logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2(格式3),数据行有{len(row)}列,列2是时间段")
- break
- logger.debug(f"[工况信息格式3/5] 检测到格式3,数据行有{len(row)}列")
- break
-
- name_idx = 0
- # 无论格式3还是格式5,时间段都在列2(索引2),因为时间列虽然有colspan=2,
- # 但实际数据中时间段是电压列的第一列,在索引2
- time_idx = 1
- voltage_time_idx = 2 # 电压时间段列(格式3和格式5都在索引2)
- voltage_max_idx = 3 # 电压最大值列
- voltage_min_idx = 4 # 电压最小值列
- current_max_idx = 5 # 电流最大值列
- current_min_idx = 6 # 电流最小值列
- active_power_max_idx = 7 # 有功最大值列
- active_power_min_idx = 8 # 有功最小值列
- reactive_power_max_idx = 9 # 无功最大值列
- reactive_power_min_idx = 10 # 无功最小值列
-
- # 从第三行开始解析数据(前两行是表头)
- current_name = ""
- current_time = ""
-
- logger.info(f"[工况信息格式3/5] 开始解析数据行,表格总行数: {len(table)}, 从第3行(索引2)开始")
- for row_idx in range(2, len(table)):
- row = table[row_idx]
- logger.debug(f"[工况信息格式3/5] 处理第{row_idx}行,列数: {len(row)}, 前5列: {row[:5]}")
-
- # 至少需要4列
- if len(row) < 4:
- logger.debug(f"[工况信息格式3/5] 第{row_idx}行列数不足4列,跳过")
- continue
-
- # 检查是否是表头行(包含"名称"、"时间"、"最大值"、"最小值"等关键词)
- if any(keyword in " ".join(row[:5]).lower() for keyword in ["名称", "时间", "最大值", "最小值", "电压", "电流", "有功", "无功"]):
- logger.debug(f"[工况信息格式3/5] 跳过表头行: {row[:3]}")
- continue
-
- # 检查是否是项目名称行(整行合并,如"蕲昌220kV变电站"、"输电线路")
- # 项目名称行通常只有第0列有值,其他列都为空(因为colspan)
- non_empty_cols = [i for i, cell in enumerate(row) if cell.strip()]
- if len(non_empty_cols) == 1 and non_empty_cols[0] == 0:
- # 检查内容是否是项目名称(不包含"主变"、"#"、"线"等设备名称关键词)
- cell_value = row[0].strip()
- if not any(k in cell_value for k in ["主变", "#", "线"]):
- # 可能是项目名称行,跳过
- logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {cell_value}")
- continue
-
- # 更新名称(如果有值)
- if name_idx < len(row) and row[name_idx].strip():
- name_value = row[name_idx].strip()
- # 检查是否是有效名称(包含"主变"、"#"等,但排除"输电线路"这样的项目名称)
- if name_value in ["输电线路", "变电站"]:
- # 这是项目名称行,跳过
- logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {name_value}")
- continue
- elif any(k in name_value for k in ["主变", "#"]) or ("kV" in name_value and "线" in name_value):
- current_name = name_value
- logger.debug(f"[工况信息格式3/5] 更新名称: {current_name}")
-
- # 更新时间(如果有值)
- # 时间列可能有colspan=2,所以检查列1和列2
- time_value = ""
- if time_idx < len(row) and row[time_idx].strip():
- time_value = row[time_idx].strip()
- elif time_idx + 1 < len(row) and row[time_idx + 1].strip():
- time_value = row[time_idx + 1].strip()
-
- # 检查是否是日期格式(支持"2025.03.28"和"2025.08.29-08.30")
- if time_value and (re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}', time_value) or re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}-\d{1,2}\.\d{1,2}', time_value)):
- current_time = time_value
- logger.debug(f"[工况信息格式3/5] 更新时间: {current_time}")
-
- # 检查是否有电压时间段(格式3/5的特征)
- if voltage_time_idx < len(row) and row[voltage_time_idx].strip():
- voltage_time = row[voltage_time_idx].strip()
- # 检查是否是时间段(包含"昼间"、"夜间"等)
- if any(k in voltage_time for k in ["昼间", "夜间", "次日", "~", ":"]):
- # 创建工况信息记录(使用OperationalConditionV2格式)
- oc = OperationalConditionV2()
- oc.project = ""
- oc.name = current_name
-
- # 组合monitorAt:时间 + 时间段(保持原始格式,如"2025.03.28 昼间9:00~11:00")
- if current_time:
- oc.monitorAt = f"{current_time} {voltage_time}".strip()
- else:
- oc.monitorAt = voltage_time
-
- # 电压最大值
- if voltage_max_idx < len(row) and row[voltage_max_idx].strip():
- oc.maxVoltage = row[voltage_max_idx].strip()
-
- # 电压最小值
- if voltage_min_idx < len(row) and row[voltage_min_idx].strip():
- oc.minVoltage = row[voltage_min_idx].strip()
-
- # 电流最大值
- if current_max_idx < len(row) and row[current_max_idx].strip():
- oc.maxCurrent = row[current_max_idx].strip()
-
- # 电流最小值
- if current_min_idx < len(row) and row[current_min_idx].strip():
- oc.minCurrent = row[current_min_idx].strip()
-
- # 有功功率最大值
- if active_power_max_idx < len(row) and row[active_power_max_idx].strip():
- oc.maxActivePower = row[active_power_max_idx].strip()
-
- # 有功功率最小值
- if active_power_min_idx < len(row) and row[active_power_min_idx].strip():
- oc.minActivePower = row[active_power_min_idx].strip()
-
- # 无功功率最大值
- if reactive_power_max_idx < len(row) and row[reactive_power_max_idx].strip():
- oc.maxReactivePower = row[reactive_power_max_idx].strip()
-
- # 无功功率最小值
- if reactive_power_min_idx < len(row) and row[reactive_power_min_idx].strip():
- oc.minReactivePower = row[reactive_power_min_idx].strip()
-
- # 添加记录(只要名称不为空)
- if oc.name:
- conditions.append(oc)
- logger.info(f"[工况信息格式3/5] 解析到第{len(conditions)}条记录: name='{oc.name}', monitorAt='{oc.monitorAt}'")
-
- # 只处理第一个匹配的表格
- if conditions:
- break
-
- logger.info(f"[工况信息格式3/5] 共解析到 {len(conditions)} 条工况信息")
- return conditions
|