# Copyright (c) Opendatalab. All rights reserved.
"""
表格解析模块 v2 - 独立版本,不依赖v1
"""
from typing import List
import re
from ..utils.logging_config import get_logger
from ..models.data_models import OperationalCondition, OperationalConditionV2
logger = get_logger("pdf_converter_v2.parser.table")
def normalize_text(text: str) -> str:
"""将常见全角符号、大小写等统一,便于关键词匹配"""
if not text:
return ""
text = text.lower()
replacements = {
"(": "(",
")": ")",
":": ":",
"-": "-",
"—": "-",
"〜": "~",
"~": "~",
"/": "/",
" ": " ",
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
def parse_table_cell(cell_content: str) -> str:
"""解析表格单元格内容"""
if not cell_content:
return ""
cell_content = re.sub(r'<[^>]+>', '', cell_content)
cell_content = re.sub(r'\s+', ' ', cell_content).strip()
return cell_content
def extract_table_data(markdown_content: str) -> List[List[List[str]]]:
"""从Markdown内容中提取表格数据"""
tables: List[List[List[str]]] = []
# 匹配带属性的table标签,如
table_matches = re.findall(r'', markdown_content, re.DOTALL)
logger.debug(f"[extract_table_data] 共找到 {len(table_matches)} 个表格")
for table_idx, table_content in enumerate(table_matches):
table_rows: List[List[str]] = []
tr_matches = re.findall(r']*>(.*?)
', table_content, re.DOTALL)
logger.debug(f"[extract_table_data] 表格{table_idx}, 行数: {len(tr_matches)}")
for row_idx, tr_content in enumerate(tr_matches):
td_matches = re.findall(r']*>(.*?) | ', tr_content)
row: List[str] = [parse_table_cell(td) for td in td_matches]
if row:
table_rows.append(row)
if table_rows:
tables.append(table_rows)
logger.debug(f"[extract_table_data] 总表格: {len(tables)}")
return tables
def extract_table_with_rowspan_colspan(markdown_content: str) -> List[List[List[str]]]:
"""提取表格数据,处理rowspan和colspan属性"""
tables: List[List[List[str]]] = []
# 匹配带属性的table标签,如
table_matches = re.findall(r'', markdown_content, re.DOTALL)
logger.debug(f"[extract_table_with_rowspan_colspan] 共找到 {len(table_matches)} 个表格")
for table_idx, table_content in enumerate(table_matches):
tr_matches = re.findall(r']*>(.*?)
', table_content, re.DOTALL)
logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx}, 行数: {len(tr_matches)}")
if not tr_matches:
continue
# 用于存储rowspan的值(跨行的单元格值)
rowspan_values = {} # {(row_idx, col_idx): (value, remaining_rows)}
# 先构建一个矩阵来存储所有单元格
max_cols = 0
table_matrix = []
for row_idx, tr_content in enumerate(tr_matches):
# 找到所有td标签,包括属性
td_pattern = r']*>(.*?) | '
td_matches_with_attrs = re.finditer(td_pattern, tr_content, re.DOTALL)
row = []
col_idx = 0
for td_match in td_matches_with_attrs:
full_td = td_match.group(0)
cell_content = td_match.group(1)
# 提取rowspan和colspan属性
rowspan_match = re.search(r'rowspan=["\']?(\d+)["\']?', full_td)
colspan_match = re.search(r'colspan=["\']?(\d+)["\']?', full_td)
rowspan = int(rowspan_match.group(1)) if rowspan_match else 1
colspan = int(colspan_match.group(1)) if colspan_match else 1
# 解析单元格内容
cell_text = parse_table_cell(cell_content)
# 跳过被rowspan占用的列
while (row_idx, col_idx) in rowspan_values:
row.append(rowspan_values[(row_idx, col_idx)][0]) # 使用rowspan的值
remaining = rowspan_values[(row_idx, col_idx)][1] - 1
if remaining > 0:
rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining)
del rowspan_values[(row_idx, col_idx)]
col_idx += 1
# 添加单元格内容
for c in range(colspan):
row.append(cell_text if c == 0 else "")
# 如果有rowspan,记录到后续行
if rowspan > 1 and c == 0:
rowspan_values[(row_idx + 1, col_idx)] = (cell_text, rowspan - 1)
col_idx += 1
# 处理剩余的被rowspan占用的列
while (row_idx, col_idx) in rowspan_values:
row.append(rowspan_values[(row_idx, col_idx)][0])
remaining = rowspan_values[(row_idx, col_idx)][1] - 1
if remaining > 0:
rowspan_values[(row_idx + 1, col_idx)] = (rowspan_values[(row_idx, col_idx)][0], remaining)
del rowspan_values[(row_idx, col_idx)]
col_idx += 1
if row:
table_matrix.append(row)
max_cols = max(max_cols, len(row))
logger.debug(f"[extract_table_with_rowspan_colspan] 表格{table_idx} 第{row_idx}行, 内容: {row}")
# 统一列数(可选,确保每行列数一致)
for row in table_matrix:
while len(row) < max_cols:
row.append("")
if table_matrix:
tables.append(table_matrix)
logger.debug(f"[extract_table_with_rowspan_colspan] 总表格: {len(tables)}")
return tables
def parse_operational_conditions(markdown_content: str, require_title: bool = True) -> List[OperationalCondition]:
"""解析工况信息表格
Args:
markdown_content: Markdown内容
require_title: 是否要求必须有标题标识(如"附件2 工况信息"),默认为True
如果为False,则仅根据表格结构判断是否为工况信息表格
"""
conditions: List[OperationalCondition] = []
# 查找工况信息相关的表格
if require_title:
if "附件2 工况信息" not in markdown_content and "工况信息" not in markdown_content:
logger.debug("[工况信息] 未找到工况信息标识")
return conditions
else:
logger.debug("[工况信息] 无标题模式:仅根据表格结构判断")
# 提取表格数据(支持rowspan和colspan)
tables = extract_table_with_rowspan_colspan(markdown_content)
if not tables:
logger.warning("[工况信息] 未能提取出任何表格内容")
return conditions
# 查找工况信息表格(通常包含"检测时间"、"电压"、"电流"等关键词)
for table in tables:
if not table or len(table) < 2:
continue
# 检查表头是否包含工况信息的关键词
header_row = table[0]
has_operational_keywords = any(
keyword in " ".join(header_row)
for keyword in ["检测时间", "电压", "电流", "有功功率", "无功功率", "项目"]
)
if not has_operational_keywords:
continue
logger.info(f"[工况信息] 找到工况信息表格,行数: {len(table)}")
# 找到表头行的列索引
header_row = table[0]
monitor_at_idx = -1
project_idx = -1
name_idx = -1
voltage_idx = -1
current_idx = -1
active_power_idx = -1
reactive_power_idx = -1
for idx, cell in enumerate(header_row):
cell_lower = cell.lower()
if "检测时间" in cell or "监测时间" in cell:
monitor_at_idx = idx
elif "项目" in cell:
# 项目列可能有colspan,需要找到实际的列
if project_idx == -1:
project_idx = idx
# 检查下一列是否是名称列(如果项目列colspan=2,下一列可能是名称)
if idx + 1 < len(header_row) and name_idx == -1:
next_cell = header_row[idx + 1]
if not any(k in next_cell.lower() for k in ["电压", "电流", "有功", "无功", "检测"]):
name_idx = idx + 1
elif "电压" in cell or "电压(kv)" in cell_lower:
voltage_idx = idx
elif "电流" in cell or "电流(a)" in cell_lower:
current_idx = idx
elif "有功功率" in cell or ("有功" in cell and "功率" in cell):
active_power_idx = idx
elif "无功功率" in cell or ("无功" in cell and "功率" in cell):
reactive_power_idx = idx
elif ("名称" in cell or "主变" in cell) and name_idx == -1:
name_idx = idx
logger.debug(f"[工况信息] 列索引: 检测时间={monitor_at_idx}, 项目={project_idx}, 名称={name_idx}, "
f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}")
# 处理数据行(从第二行开始,第一行是表头)
current_monitor_at = ""
current_project = ""
for row_idx in range(1, len(table)):
row = table[row_idx]
if len(row) < 4: # 至少需要检测时间、项目、名称等基本字段
continue
# 检测时间
if monitor_at_idx >= 0 and monitor_at_idx < len(row) and row[monitor_at_idx].strip():
current_monitor_at = row[monitor_at_idx].strip()
# 项目名称
if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip():
current_project = row[project_idx].strip()
# 名称(如1#主变)
name_value = ""
if name_idx >= 0 and name_idx < len(row):
name_value = row[name_idx].strip()
elif project_idx >= 0 and project_idx + 1 < len(row):
# 如果名称列在项目列后面
name_value = row[project_idx + 1].strip()
# 只有当名称存在时才创建工况信息记录(因为有rowspan的情况)
if name_value and any(k in name_value for k in ["主变", "#"]):
oc = OperationalCondition()
oc.monitorAt = current_monitor_at
oc.project = current_project
oc.name = name_value
# 电压
if voltage_idx >= 0 and voltage_idx < len(row):
oc.voltage = row[voltage_idx].strip()
# 电流
if current_idx >= 0 and current_idx < len(row):
oc.current = row[current_idx].strip()
# 有功功率
if active_power_idx >= 0 and active_power_idx < len(row):
oc.activePower = row[active_power_idx].strip()
# 无功功率
if reactive_power_idx >= 0 and reactive_power_idx < len(row):
oc.reactivePower = row[reactive_power_idx].strip()
conditions.append(oc)
logger.debug(f"[工况信息] 解析到: {oc.to_dict()}")
# 只处理第一个匹配的表格
if conditions:
break
logger.info(f"[工况信息] 共解析到 {len(conditions)} 条工况信息")
return conditions
def parse_operational_conditions_v2(markdown_content: str) -> List[OperationalConditionV2]:
"""解析工况信息表格(新格式:表1检测工况)
表格结构:
- 第一行:名称、时间,电压(kV)(colspan=2),电流(A)(colspan=2),有功(MW)(colspan=2),无功(Mvar)(colspan=2)
- 第二行:最大值、最小值(重复4次)
- 数据行:名称、时间、电压最大值、电压最小值、电流最大值、电流最小值、有功最大值、有功最小值、无功最大值、无功最小值
"""
conditions: List[OperationalConditionV2] = []
# 检查是否包含"表1检测工况"标识(使用正则表达式,允许中间有空格)
# 支持:表1检测工况、表 1 检测工况、表 1检测工况、表1 检测工况 等变体
pattern = r'表\s*1\s*检测工况'
if not re.search(pattern, markdown_content):
logger.debug("[工况信息V2] 未找到'表1检测工况'标识(包括空格变体)")
return conditions
logger.debug("[工况信息V2] 检测到'表1检测工况'格式(包括空格变体),第一列将映射到name字段,project字段保持为空")
# 提取表格数据(支持rowspan和colspan)
tables = extract_table_with_rowspan_colspan(markdown_content)
if not tables:
logger.warning("[工况信息V2] 未能提取出任何表格内容")
return conditions
# 查找包含"表1检测工况"的表格
# 表格结构:第一行是名称、时间,然后是电压、电流、有功、无功(各占2列)
for table in tables:
if not table or len(table) < 3: # 至少需要表头2行和数据1行
continue
# 检查第一行表头是否包含"名称"、"时间"、"电压"等关键词
first_row = table[0]
first_row_text = " ".join(first_row).lower()
has_keywords = any(k in first_row_text for k in ["名称", "时间", "电压", "电流", "有功", "无功"])
if not has_keywords:
continue
logger.info(f"[工况信息V2] 找到工况信息表格,行数: {len(table)}")
# 列索引映射(根据表格结构)
# 列0: 项目名称(映射到name字段)
# 列1: 时间
# 列2: 电压最大值
# 列3: 电压最小值
# 列4: 电流最大值
# 列5: 电流最小值
# 列6: 有功最大值
# 列7: 有功最小值
# 列8: 无功最大值
# 列9: 无功最小值
# 注意:对于"表1检测工况"格式,第一列映射到name字段,project字段保持为空
name_idx = 0 # 第一列是项目名称(如"500kV 江黄Ⅰ线")
time_idx = 1
voltage_max_idx = 2
voltage_min_idx = 3
current_max_idx = 4
current_min_idx = 5
active_power_max_idx = 6
active_power_min_idx = 7
reactive_power_max_idx = 8
reactive_power_min_idx = 9
# 从第三行开始解析数据(前两行是表头)
logger.debug(f"[工况信息V2] 表格总行数: {len(table)}, 开始从第3行(索引2)解析数据行")
for row_idx in range(2, len(table)):
row = table[row_idx]
logger.debug(f"[工况信息V2] 处理第{row_idx}行(索引{row_idx}): 列数={len(row)}, 内容={row[:5]}...") # 只打印前5列用于调试
# 至少需要10列(名称、时间、电压max/min、电流max/min、有功max/min、无功max/min)
if len(row) < 10:
logger.warning(f"[工况信息V2] 第{row_idx}行列数不足10列,跳过: {len(row)}列")
continue
# 检查是否是数据行(第一列应该有项目名称,且不是"名称"、"最大值"、"最小值"等表头关键词)
first_cell = row[name_idx].strip() if name_idx < len(row) else ""
if not first_cell or first_cell in ["名称", "最大值", "最小值", "时间"]:
logger.debug(f"[工况信息V2] 第{row_idx}行第一列为表头关键词或为空,跳过: '{first_cell}'")
continue
# 跳过完全空的行(但允许某些字段为空)
if not any(cell.strip() for cell in row[:2]): # 至少名称或时间应该有值
logger.debug(f"[工况信息V2] 第{row_idx}行前两列为空,跳过")
continue
oc = OperationalConditionV2()
# 名称(列0,映射到name字段)
if name_idx < len(row):
oc.name = row[name_idx].strip()
# project字段保持为空(仅在检测工况之外的场景使用)
oc.project = ""
# 时间(列1)
if time_idx < len(row):
oc.monitorAt = row[time_idx].strip()
# 电压最大值(列2)
if voltage_max_idx < len(row):
oc.maxVoltage = row[voltage_max_idx].strip()
# 电压最小值(列3)
if voltage_min_idx < len(row):
oc.minVoltage = row[voltage_min_idx].strip()
# 电流最大值(列4)
if current_max_idx < len(row):
oc.maxCurrent = row[current_max_idx].strip()
# 电流最小值(列5)
if current_min_idx < len(row):
oc.minCurrent = row[current_min_idx].strip()
# 有功功率最大值(列6)
if active_power_max_idx < len(row):
oc.maxActivePower = row[active_power_max_idx].strip()
# 有功功率最小值(列7)
if active_power_min_idx < len(row):
oc.minActivePower = row[active_power_min_idx].strip()
# 无功功率最大值(列8)
if reactive_power_max_idx < len(row):
oc.maxReactivePower = row[reactive_power_max_idx].strip()
# 无功功率最小值(列9)
if reactive_power_min_idx < len(row):
oc.minReactivePower = row[reactive_power_min_idx].strip()
# 添加记录(只要名称不为空)
if oc.name:
conditions.append(oc)
logger.info(f"[工况信息V2] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}'")
else:
logger.warning(f"[工况信息V2] 第{row_idx}行名称为空,跳过该行: {row[:3]}")
# 只处理第一个匹配的表格
if conditions:
break
logger.info(f"[工况信息V2] 共解析到 {len(conditions)} 条工况信息")
return conditions
def parse_operational_conditions_opstatus(markdown_content: str) -> List[OperationalCondition]:
"""解析工况信息表格(opStatus格式:附件 工况及工程信息)
表格结构:
- 第一行:名称(rowspan=2)、时间(rowspan=2)、运行工况(colspan=4)
- 第二行:U (kV)、I (A)、P (MW)、Q (Mvar)
- 数据行:名称、时间(可能有rowspan)、U范围、I范围、P范围、Q范围
输出格式:
[
{
"monitorAt": "", // 检测时间
"project": "", // 项目名称(从"项目编号"提取,如果存在)
"name": "", // 名称,如#1主变
"voltage": "", // 电压范围
"current": "", // 电流范围
"activePower": "", // 有功功率范围
"reactivePower": "", // 无功功率范围
}
]
"""
conditions: List[OperationalCondition] = []
# 检查是否包含"附件 工况及工程信息"标识
if "附件" not in markdown_content or "工况" not in markdown_content:
logger.debug("[工况信息opStatus] 未找到'附件 工况及工程信息'标识")
return conditions
# 提取项目编号(如果存在)
# 需要排除表格内容,只匹配表格之前的内容
project_number = ""
# 先找到表格开始位置
table_start = markdown_content.find('')
if table_start > 0:
# 只在表格之前的内容中查找项目编号
content_before_table = markdown_content[:table_start]
project_match = re.search(r'项目编号[::]\s*([^\n<]+)', content_before_table)
if project_match:
project_number = project_match.group(1).strip()
logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}")
else:
# 如果没有表格,在整个内容中查找
project_match = re.search(r'项目编号[::]\s*([^\n<]+)', markdown_content)
if project_match:
project_number = project_match.group(1).strip()
logger.debug(f"[工况信息opStatus] 提取到项目编号: {project_number}")
# 提取表格数据(支持rowspan和colspan)
tables = extract_table_with_rowspan_colspan(markdown_content)
if not tables:
logger.warning("[工况信息opStatus] 未能提取出任何表格内容")
return conditions
# 查找工况信息表格(包含"名称"、"时间"、"U"、"I"、"P"、"Q"等关键词)
for table in tables:
if not table or len(table) < 3: # 至少需要表头2行和数据1行
continue
# 检查表头是否包含工况信息的关键词
first_row = table[0]
second_row = table[1] if len(table) > 1 else []
header_text = normalize_text(" ".join(first_row + second_row))
has_keywords = any(
keyword in header_text
for keyword in ["名称", "时间", "运行工况", "u", "i", "p", "q", "kv", "mw", "mvar"]
)
if not has_keywords:
continue
logger.info(f"[工况信息opStatus] 找到工况信息表格,行数: {len(table)}")
# 检测是否有两行表头(第二行包含"U (kV)"、"I (A)"等)
has_two_row_header = False
if len(table) > 1:
second_row_text = normalize_text(" ".join(table[1]))
# 兼容带空格和不带空格的写法,例如 "U (kV)" / "U(kV)"
if (
any(k in second_row_text for k in ["u(kv)", "i(a)", "p(mw)", "q(mvar)"])
or ("u" in second_row_text and "kv" in second_row_text)
or ("i" in second_row_text and "a" in second_row_text)
or ("p" in second_row_text and "mw" in second_row_text)
or ("q" in second_row_text and "mvar" in second_row_text)
):
has_two_row_header = True
logger.debug("[工况信息opStatus] 检测到两行表头格式")
# 根据表头动态确定列索引
# 如果有两行表头,从第二行检测;否则从第一行检测
header_row = table[1] if has_two_row_header else table[0]
time_idx = -1
project_idx = -1
name_idx = -1
voltage_idx = -1
current_idx = -1
active_power_idx = -1
reactive_power_idx = -1
for idx, cell in enumerate(header_row):
cell_lower = cell.lower()
cell_normalized = normalize_text(cell)
if "检测时间" in cell or "监测时间" in cell or "时间" in cell:
time_idx = idx
elif "项目" in cell:
project_idx = idx
elif "名称" in cell:
name_idx = idx
elif "电压" in cell or ("u" in cell_normalized and "kv" in cell_normalized) or ("u (kv)" in cell_normalized):
voltage_idx = idx
elif "电流" in cell or ("i" in cell_normalized and "a" in cell_normalized) or ("i (a)" in cell_normalized):
current_idx = idx
elif "有功功率" in cell or ("有功" in cell and "功率" in cell) or ("p" in cell_normalized and "mw" in cell_normalized) or ("p (mw)" in cell_normalized):
active_power_idx = idx
elif "无功功率" in cell or ("无功" in cell and "功率" in cell) or ("q" in cell_normalized and "mvar" in cell_normalized) or ("q (mvar)" in cell_normalized):
reactive_power_idx = idx
# 如果第一行表头有"名称",也检查第一行
if has_two_row_header and name_idx == -1:
first_row = table[0]
for idx, cell in enumerate(first_row):
if "名称" in cell:
name_idx = idx
break
# 如果表头中没有找到名称列,尝试在数据行中查找
if name_idx == -1:
# 确定第一行数据的位置(如果有两行表头,从第2行开始;否则从第1行开始)
first_data_row_idx = 2 if has_two_row_header else 1
if len(table) > first_data_row_idx:
first_data_row = table[first_data_row_idx]
for idx, cell in enumerate(first_data_row):
# 跳过已知的列(时间列和项目列)
if idx != time_idx and idx != project_idx and cell.strip():
# 支持主变格式(包含"主变"或"#")和输电线路格式(包含"kV"和"线")
if (any(k in cell for k in ["主变", "#"]) or
("kV" in cell and "线" in cell) or
("kV" in cell)):
name_idx = idx
logger.debug(f"[工况信息opStatus] 从数据行推断名称列索引: {name_idx}, 内容='{cell}'")
break
# 如果仍然没有找到名称列,但找到了项目列,则名称列通常在项目列之后
# 如果项目列有colspan,名称列可能在空列之后
if name_idx == -1 and project_idx >= 0:
# 检查项目列之后是否有空列,如果有,名称列在空列位置(因为colspan会创建空列)
if project_idx + 1 < len(header_row):
if not header_row[project_idx + 1].strip():
# 项目列有colspan,名称列在空列位置(project_idx + 1)
name_idx = project_idx + 1
else:
# 名称列紧跟在项目列之后
name_idx = project_idx + 1
else:
# 如果项目列是最后一列,名称列可能在项目列之后
name_idx = project_idx + 1 if project_idx + 1 < len(header_row) else -1
logger.debug(f"[工况信息opStatus] 列索引: 检测时间={time_idx}, 项目={project_idx}, 名称={name_idx}, "
f"电压={voltage_idx}, 电流={current_idx}, 有功功率={active_power_idx}, 无功功率={reactive_power_idx}")
# 确定数据行起始位置(如果有两行表头,从第2行开始;否则从第1行开始)
data_start_row = 2 if has_two_row_header else 1
current_monitor_at = ""
current_project = ""
for row_idx in range(data_start_row, len(table)):
row = table[row_idx]
logger.debug(f"[工况信息opStatus] 处理第{row_idx}行: 列数={len(row)}, 内容={row}")
# 至少需要3列
if len(row) < 3:
logger.warning(f"[工况信息opStatus] 第{row_idx}行列数不足,跳过: {len(row)}列")
continue
# 更新检测时间(如果有值且列索引有效)
if time_idx >= 0 and time_idx < len(row) and row[time_idx].strip():
current_monitor_at = row[time_idx].strip()
# 更新项目/位置(如果有值且列索引有效)
if project_idx >= 0 and project_idx < len(row) and row[project_idx].strip():
current_project = row[project_idx].strip()
# 获取名称(变压器名称,如"1#主变")
name_value = ""
if name_idx >= 0 and name_idx < len(row):
name_value = row[name_idx].strip()
elif project_idx >= 0 and project_idx + 1 < len(row):
# 如果名称列索引未找到,尝试项目列之后的第一列
name_value = row[project_idx + 1].strip()
# 检查是否是噪声数据行(包含测点编号如N1、N2等,且没有电压/电流/功率列)
is_noise_row = False
# 检查名称列或第一列是否是测点编号
check_cell = name_value if name_value else (row[0].strip() if len(row) > 0 else "")
if check_cell:
# 如果名称列或第一列包含测点编号格式(N1、N2等)且没有找到电压/电流/功率列,可能是噪声数据行
if re.match(r'^N\d+', check_cell) and voltage_idx == -1 and current_idx == -1:
is_noise_row = True
logger.debug(f"[工况信息opStatus] 第{row_idx}行疑似噪声数据行,跳过: 第一列或名称列='{check_cell}'")
# 只有当名称存在且不是噪声数据行时才创建工况信息记录
# 放宽名称验证:支持主变(包含"主变"或"#")和输电线路(包含"kV"和"线")
is_valid_name = False
if name_value:
# 主变格式:包含"主变"或"#"
if any(k in name_value for k in ["主变", "#"]):
is_valid_name = True
# 输电线路格式:包含"kV"和"线"
elif "kV" in name_value and "线" in name_value:
is_valid_name = True
# 其他可能的格式:包含"kV"(可能是其他设备)
elif "kV" in name_value:
is_valid_name = True
if is_valid_name and not is_noise_row:
# 进一步验证:必须有电压或电流或功率列,否则可能是噪声数据行
if voltage_idx == -1 and current_idx == -1 and active_power_idx == -1 and reactive_power_idx == -1:
logger.debug(f"[工况信息opStatus] 第{row_idx}行没有找到电压/电流/功率列,跳过: name='{name_value}'")
continue
# 创建工况信息记录
oc = OperationalCondition()
oc.monitorAt = current_monitor_at
oc.project = current_project # project字段使用表格中"项目"列的值
oc.name = name_value
# 电压范围
if voltage_idx >= 0 and voltage_idx < len(row):
oc.voltage = row[voltage_idx].strip()
# 电流范围
if current_idx >= 0 and current_idx < len(row):
oc.current = row[current_idx].strip()
# 有功功率范围
if active_power_idx >= 0 and active_power_idx < len(row):
oc.activePower = row[active_power_idx].strip()
# 无功功率范围
if reactive_power_idx >= 0 and reactive_power_idx < len(row):
oc.reactivePower = row[reactive_power_idx].strip()
# 添加记录
conditions.append(oc)
logger.info(f"[工况信息opStatus] 解析到第{len(conditions)}条记录: name='{oc.name}', 时间='{oc.monitorAt}', 项目='{oc.project}'")
else:
if is_noise_row:
logger.debug(f"[工况信息opStatus] 第{row_idx}行是噪声数据行,跳过: name='{name_value}'")
else:
logger.debug(f"[工况信息opStatus] 第{row_idx}行名称无效或为空,跳过: name='{name_value}'")
# 只处理第一个匹配的表格
if conditions:
break
logger.info(f"[工况信息opStatus] 共解析到 {len(conditions)} 条工况信息")
return conditions
def parse_operational_conditions_format3_5(markdown_content: str) -> List[OperationalConditionV2]:
"""解析工况信息表格(格式3和格式5:附件 2 工况信息,电压列第一列存储时间段)
表格结构:
- 第一行:名称(rowspan=2)、时间(rowspan=2或colspan=2)、电压(kV)(colspan=2)、电流(A)(colspan=2)、有功(MW)(colspan=2)、无功(Mvar)(colspan=2)
- 第二行:最大值、最小值(重复4次)
- 数据行特点:
- 电压列的第一列存储时间段(如"昼间9:00~11:00"),第二列是电压最大值
- 电流列的第一列是电流最大值,第二列是电流最小值
- 有功和无功类似
输出格式(使用OperationalConditionV2):
[
{
"monitorAt": "", // 检测时间(从时间列和电压时间段列组合)
"project": "", // 项目名称
"name": "", // 名称,如#3主变
"maxVoltage": "", // 电压最大值
"minVoltage": "", // 电压最小值
"maxCurrent": "", // 电流最大值
"minCurrent": "", // 电流最小值
"maxActivePower": "", // 有功功率最大值
"minActivePower": "", // 有功功率最小值
"maxReactivePower": "", // 无功功率最大值
"minReactivePower": "", // 无功功率最小值
}
]
"""
conditions: List[OperationalConditionV2] = []
# 检查是否包含"附件 2 工况信息"或"附件 2 工况及工程信息"标识
if "附件" not in markdown_content or ("工况信息" not in markdown_content and "工况及工程信息" not in markdown_content):
logger.debug("[工况信息格式3/5] 未找到'附件 2 工况信息'或'附件 2 工况及工程信息'标识")
return conditions
# 提取表格数据(支持rowspan和colspan)
tables = extract_table_with_rowspan_colspan(markdown_content)
if not tables:
logger.warning("[工况信息格式3/5] 未能提取出任何表格内容")
return conditions
# 查找工况信息表格
for table in tables:
if not table or len(table) < 3: # 至少需要表头2行和数据1行
continue
# 检查表头是否包含工况信息的关键词
first_row = table[0]
second_row = table[1] if len(table) > 1 else []
header_text = normalize_text(" ".join(first_row + second_row))
has_keywords = any(
keyword in header_text
for keyword in ["名称", "时间", "电压", "电流", "有功", "无功", "kv", "mw", "mvar"]
)
if not has_keywords:
continue
logger.info(f"[工况信息格式3/5] 找到工况信息表格,行数: {len(table)}")
# 检查是否是格式3/5(电压列第一列存储时间段)
# 从数据行检查:如果电压列第一列包含"昼间"、"夜间"等时间段关键词,则是格式3/5
is_format3_5 = False
if len(table) > 2:
logger.debug(f"[工况信息格式3/5] 开始检查格式3/5特征,表格行数: {len(table)}")
for row_idx in range(2, min(5, len(table))): # 检查前几行数据
row = table[row_idx]
logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}, 内容: {row[:6]}")
if len(row) >= 4: # 至少需要名称、时间、电压列
# 电压列可能在索引2或3(格式3在列2,格式5在列3,因为时间列有colspan=2)
# 扩大检查范围到列2-6,以覆盖格式3和格式5
for col_idx in range(2, min(7, len(row))):
cell = row[col_idx].strip()
# 处理可能的换行符(如"昼间\n10:00~17:00")
cell_normalized = cell.replace("\n", " ").replace("\r", " ").strip()
if cell_normalized and any(k in cell_normalized for k in ["昼间", "夜间", "次日", "~", ":"]) and not re.match(r'^[\d.\-]+$', cell_normalized):
is_format3_5 = True
logger.info(f"[工况信息格式3/5] 检测到格式3/5特征:第{row_idx}行列{col_idx}包含时间段 '{cell_normalized}'")
break
if is_format3_5:
break
if not is_format3_5:
logger.warning("[工况信息格式3/5] 未检测到格式3/5特征(电压列包含时间段),跳过该表格")
continue
# 确定列索引
# 根据表格结构:名称、时间(可能有colspan=2)、电压(kV)[colspan=2]、电流(A)[colspan=2]、有功(MW)[colspan=2]、无功(Mvar)[colspan=2]
# 第二行表头:最大值、最小值(重复4次)
# 格式3:名称、时间、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值...
# 格式5:名称、时间(colspan=2,占用2列)、电压时间段、电压最大值、电压最小值、电流最大值、电流最小值...
# 动态检测时间列是否有colspan=2(通过检查表头和数据行的列数)
# 格式3和格式5的时间列都有colspan=2,但实际数据行可能都是11列
# 检查表头第一行,看时间列是否有colspan=2
has_time_colspan2 = False
if len(table) > 0:
first_row = table[0]
# 检查表头中是否有"时间"列,并且该列有colspan=2
# 如果第一行包含"时间"且后续列数较多,可能是colspan=2
header_text = " ".join(first_row).lower()
if "时间" in header_text:
# 检查表头结构:如果时间列后面直接是电压列,可能是colspan=2
# 或者检查数据行的列数
if len(table) > 2:
for row_idx in range(2, min(5, len(table))):
row = table[row_idx]
logger.debug(f"[工况信息格式3/5] 检查第{row_idx}行,列数: {len(row)}")
if len(row) >= 11: # 格式3/5:名称(1) + 时间(2) + 电压时间段(1) + 电压max(1) + 电压min(1) + 电流max(1) + 电流min(1) + 有功max(1) + 有功min(1) + 无功max(1) + 无功min(1) = 11列
has_time_colspan2 = True
logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2,数据行有{len(row)}列")
break
elif len(row) >= 10:
# 可能是格式3,但时间列也可能有colspan=2(只是数据值只占1列)
# 检查列2是否是时间段(包含"昼间"、"夜间"等)
if len(row) > 2 and any(k in row[2] for k in ["昼间", "夜间", "次日", "~", ":"]):
has_time_colspan2 = True
logger.info(f"[工况信息格式3/5] 检测到时间列有colspan=2(格式3),数据行有{len(row)}列,列2是时间段")
break
logger.debug(f"[工况信息格式3/5] 检测到格式3,数据行有{len(row)}列")
break
name_idx = 0
# 无论格式3还是格式5,时间段都在列2(索引2),因为时间列虽然有colspan=2,
# 但实际数据中时间段是电压列的第一列,在索引2
time_idx = 1
voltage_time_idx = 2 # 电压时间段列(格式3和格式5都在索引2)
voltage_max_idx = 3 # 电压最大值列
voltage_min_idx = 4 # 电压最小值列
current_max_idx = 5 # 电流最大值列
current_min_idx = 6 # 电流最小值列
active_power_max_idx = 7 # 有功最大值列
active_power_min_idx = 8 # 有功最小值列
reactive_power_max_idx = 9 # 无功最大值列
reactive_power_min_idx = 10 # 无功最小值列
# 从第三行开始解析数据(前两行是表头)
current_name = ""
current_time = ""
logger.info(f"[工况信息格式3/5] 开始解析数据行,表格总行数: {len(table)}, 从第3行(索引2)开始")
for row_idx in range(2, len(table)):
row = table[row_idx]
logger.debug(f"[工况信息格式3/5] 处理第{row_idx}行,列数: {len(row)}, 前5列: {row[:5]}")
# 至少需要4列
if len(row) < 4:
logger.debug(f"[工况信息格式3/5] 第{row_idx}行列数不足4列,跳过")
continue
# 检查是否是表头行(包含"名称"、"时间"、"最大值"、"最小值"等关键词)
if any(keyword in " ".join(row[:5]).lower() for keyword in ["名称", "时间", "最大值", "最小值", "电压", "电流", "有功", "无功"]):
logger.debug(f"[工况信息格式3/5] 跳过表头行: {row[:3]}")
continue
# 检查是否是项目名称行(整行合并,如"蕲昌220kV变电站"、"输电线路")
# 项目名称行通常只有第0列有值,其他列都为空(因为colspan)
non_empty_cols = [i for i, cell in enumerate(row) if cell.strip()]
if len(non_empty_cols) == 1 and non_empty_cols[0] == 0:
# 检查内容是否是项目名称(不包含"主变"、"#"、"线"等设备名称关键词)
cell_value = row[0].strip()
if not any(k in cell_value for k in ["主变", "#", "线"]):
# 可能是项目名称行,跳过
logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {cell_value}")
continue
# 更新名称(如果有值)
if name_idx < len(row) and row[name_idx].strip():
name_value = row[name_idx].strip()
# 检查是否是有效名称(包含"主变"、"#"等,但排除"输电线路"这样的项目名称)
if name_value in ["输电线路", "变电站"]:
# 这是项目名称行,跳过
logger.debug(f"[工况信息格式3/5] 跳过项目名称行: {name_value}")
continue
elif any(k in name_value for k in ["主变", "#"]) or ("kV" in name_value and "线" in name_value):
current_name = name_value
logger.debug(f"[工况信息格式3/5] 更新名称: {current_name}")
# 更新时间(如果有值)
# 时间列可能有colspan=2,所以检查列1和列2
time_value = ""
if time_idx < len(row) and row[time_idx].strip():
time_value = row[time_idx].strip()
elif time_idx + 1 < len(row) and row[time_idx + 1].strip():
time_value = row[time_idx + 1].strip()
# 检查是否是日期格式(支持"2025.03.28"和"2025.08.29-08.30")
if time_value and (re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}', time_value) or re.match(r'^\d{4}\.\d{1,2}\.\d{1,2}-\d{1,2}\.\d{1,2}', time_value)):
current_time = time_value
logger.debug(f"[工况信息格式3/5] 更新时间: {current_time}")
# 检查是否有电压时间段(格式3/5的特征)
if voltage_time_idx < len(row) and row[voltage_time_idx].strip():
voltage_time = row[voltage_time_idx].strip()
# 检查是否是时间段(包含"昼间"、"夜间"等)
if any(k in voltage_time for k in ["昼间", "夜间", "次日", "~", ":"]):
# 创建工况信息记录(使用OperationalConditionV2格式)
oc = OperationalConditionV2()
oc.project = ""
oc.name = current_name
# 组合monitorAt:时间 + 时间段(保持原始格式,如"2025.03.28 昼间9:00~11:00")
if current_time:
oc.monitorAt = f"{current_time} {voltage_time}".strip()
else:
oc.monitorAt = voltage_time
# 电压最大值
if voltage_max_idx < len(row) and row[voltage_max_idx].strip():
oc.maxVoltage = row[voltage_max_idx].strip()
# 电压最小值
if voltage_min_idx < len(row) and row[voltage_min_idx].strip():
oc.minVoltage = row[voltage_min_idx].strip()
# 电流最大值
if current_max_idx < len(row) and row[current_max_idx].strip():
oc.maxCurrent = row[current_max_idx].strip()
# 电流最小值
if current_min_idx < len(row) and row[current_min_idx].strip():
oc.minCurrent = row[current_min_idx].strip()
# 有功功率最大值
if active_power_max_idx < len(row) and row[active_power_max_idx].strip():
oc.maxActivePower = row[active_power_max_idx].strip()
# 有功功率最小值
if active_power_min_idx < len(row) and row[active_power_min_idx].strip():
oc.minActivePower = row[active_power_min_idx].strip()
# 无功功率最大值
if reactive_power_max_idx < len(row) and row[reactive_power_max_idx].strip():
oc.maxReactivePower = row[reactive_power_max_idx].strip()
# 无功功率最小值
if reactive_power_min_idx < len(row) and row[reactive_power_min_idx].strip():
oc.minReactivePower = row[reactive_power_min_idx].strip()
# 添加记录(只要名称不为空)
if oc.name:
conditions.append(oc)
logger.info(f"[工况信息格式3/5] 解析到第{len(conditions)}条记录: name='{oc.name}', monitorAt='{oc.monitorAt}'")
# 只处理第一个匹配的表格
if conditions:
break
logger.info(f"[工况信息格式3/5] 共解析到 {len(conditions)} 条工况信息")
return conditions