| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499 |
- # Copyright (c) Opendatalab. All rights reserved.
- """噪声检测记录解析模块 v2 - 独立版本,不依赖OCR"""
- from typing import Optional, List
- import re
- from ..utils.logging_config import get_logger
- from ..models.data_models import NoiseDetectionRecord, WeatherData, NoiseData
- from .table_parser import extract_table_with_rowspan_colspan, parse_operational_conditions, parse_operational_conditions_opstatus
- logger = get_logger("pdf_converter_v2.parser.noise")
- def clean_project_field(project: str) -> str:
- """清理project字段:如果包含"检测依据",删除"检测依据"及其后面的所有字符
- 同时清理末尾的标点符号(逗号、句号、分号等)
-
- Args:
- project: 原始project字段值
-
- Returns:
- 清理后的project字段值
- """
- if not project:
- return project
-
- # 查找"检测/监测/检查依据"的位置
- match = re.search(r'(检测|监测|检查)依据', project)
- if match:
- project = project[:match.start()].strip()
- logger.debug(f"[噪声检测] 清理project字段,删除'{match.group(0)}'及之后内容: {project}")
-
- # 清理末尾的标点符号(逗号、句号、分号、冒号等)
- project = re.sub(r'[,。;:,.;:]+$', '', project).strip()
-
- return project
- def correct_address_ocr_errors(address: str) -> str:
- """纠正address字段中的常见OCR识别错误
-
- 常见错误模式:
- 1. "厂界外lm" -> "厂界外1m" (手写的"1m"被识别为"lm")
- 2. "住户17" -> "住户1F" (手写的"1F"被识别为"17")
- 3. "住户47" -> "住户4F" (手写的"4F"被识别为"47")
- 4. "T界" -> "厂界" (手写的"厂"被识别为"T")
- 5. "群星木业17" -> "群星木业1F"
- 6. "东海花园137" -> "东海花园13F" (手写的"13F"被识别为"137")
-
- Args:
- address: 原始address字段值
-
- Returns:
- 纠正后的address字段值
- """
- if not address:
- return address
-
- original_address = address
-
- # 1. 纠正 "厂界外lm" -> "厂界外1m"
- # 匹配模式:厂界外 + lm(可能是手写的"1m"被识别为"lm")
- address = re.sub(r'厂界外lm\b', '厂界外1m', address)
-
- # 2. 纠正 "外lm" -> "外1m"(在"外"后面,如"五洲国际建材中心外lm")
- address = re.sub(r'外lm\b', '外1m', address)
-
- # 3. 纠正 "T界"/"t界" -> "厂界"(手写“厂”容易被识别为T/t,且后面可能紧跟其他汉字)
- address = re.sub(r'[Tt]界', '厂界', address)
-
- # 4. 纠正楼层号识别错误:数字+7 -> 数字+F
- # 模式:地址末尾的数字+7组合,很可能是楼层号(如1F、2F、13F等)
- # 根据实际案例:
- # - "住户17" -> "住户1F" (1楼)
- # - "住户47" -> "住户4F" (4楼)
- # - "群星木业17" -> "群星木业1F" (1楼)
- # - "群星木业47" -> "群星木业4F" (4楼)
- # - "东海花园17" -> "东海花园1F" (1楼)
- # - "东海花园137" -> "东海花园13F" (13楼)
- # - "东海花园177" -> "东海花园17F" (17楼)
- # - "东海花园217" -> "东海花园21F" (21楼)
- # - "卓维商务楼17" -> "卓维商务楼1F" (1楼)
- # - "卓维商务楼47" -> "卓维商务楼4F" (4楼)
-
- # 策略:先处理较长的模式(两位数+7),再处理较短的模式(单位数+7)
- # 这样可以避免误判,例如"137"应该被识别为"13F"而不是"1F"
-
- # 4.1 先处理两位数+7的情况(如137 -> 13F, 177 -> 17F, 217 -> 21F)
- # 匹配模式:地址末尾是两位数+7,且十位是1-2,个位是0-9
- address = re.sub(r'([1-2][0-9])7\b', r'\1F', address)
-
- # 4.2 再处理单位数+7的情况(如17 -> 1F, 27 -> 2F, 97 -> 9F)
- # 注意:这个规则在两位数规则之后执行,所以"137"已经被替换为"13F",不会再次匹配
- address = re.sub(r'([1-9])7\b', r'\1F', address)
-
- # 如果地址被修改了,记录日志
- if address != original_address:
- logger.info(f"[噪声检测] 纠正address字段OCR错误: '{original_address}' -> '{address}'")
-
- return address
- def _mark_auto_weather_default(record: NoiseDetectionRecord, weather_obj: Optional[WeatherData] = None) -> None:
- """标记天气字段使用了默认填充值"""
- setattr(record, "_auto_weather_default_used", True)
- if weather_obj is not None:
- weather_obj._auto_filled_weather = True
- def normalize_standard_text(text: str) -> str:
- """标准字段中可能包含数学/LaTeX格式,需先清理"""
- if not text:
- return text
-
- # 去掉美元符号和常见LaTeX指令(例如 \mathrm、\left、\right、\cdot 等)
- text = text.replace("$", "")
- text = re.sub(r"\\(mathrm|left|right|cdot|cdots|ldots|frac|overline|underline|mathbf|mathbf|mathit|mathsf|mathtt|mathcal)\b", "", text)
- # 删除其他未知的反斜杠命令,保留紧跟其后的文本
- text = re.sub(r"\\[a-zA-Z]+", "", text)
- # 去掉多余的大括号
- text = re.sub(r"[{}]", "", text)
- # 合并多余空白
- text = re.sub(r"\s+", " ", text).strip()
- return text
- def extract_standard_references(text: str) -> str:
- """解析检测/监测依据,支持包含数学格式的文本"""
- if not text:
- return ""
-
- text = normalize_standard_text(text.strip())
- text = text.replace("☐", "□").replace("■", "□")
- text = re.sub(r'□其他[::]?$', '', text).strip()
- gb_standards = re.findall(r'GB\s*\d+[-\.]?\d*[-\.]?\d*', text)
- if gb_standards:
- return " ".join(gb_standards)
- return re.sub(r'□\s*', '', text).strip()
- def _normalize_weather_text(weather_text: str) -> str:
- """标准化气象字段文本,插入缺失的分隔符,移除HTML"""
- if not weather_text:
- return weather_text
-
- text = weather_text
- text = re.sub(r'<[^>]+>', ' ', text) # 移除HTML标签
- text = text.replace(" ", " ")
- text = text.replace(".", ".")
- text = text.replace(",", " ")
- text = text.replace(":", ":")
- text = text.replace("℃C", "℃")
-
- # 为不同字段增加缺失的空格,避免如 "℃湿度" 无法拆分
- text = re.sub(r'([℃°C])\s*湿度', r'\1 湿度', text)
- text = re.sub(r'([℃°C])\s*风速', r'\1 风速', text)
- text = re.sub(r'(%RH)\s*风速', r'\1 风速', text, flags=re.IGNORECASE)
- text = re.sub(r'(%RH)\s*风速', r'%RH 风速', text)
- text = re.sub(r'(m/s)\s*风向', r'\1 风向', text, flags=re.IGNORECASE)
- text = re.sub(r'(M/S)\s*风向', r'm/s 风向', text)
- text = re.sub(r'风速([0-9])', r'风速 \1', text)
-
- # 保证冒号后有空格,便于分段
- text = re.sub(r'(日期|天气|温度|湿度|风速|风向)\s*[::]', r'\1: ', text)
-
- # 合并多余空白
- text = re.sub(r'\s+', ' ', text).strip()
- return text
- def parse_weather_from_text(weather_text: str, record: NoiseDetectionRecord) -> None:
- """从文本中解析天气数据,支持多条记录
-
- 文本格式示例:
- 日期:2025.9.23 天气 多云 温度26.7-27.4℃湿度66.6-67.4%RH 风速0.9-1.0 m/s 风向东偏北 日期:2025.9.24 天气 多云 温度24.5-28.6℃湿度65.3-67.1%RH 风速1.4-1.5 m/s 风向东偏北
- """
- weather_text = _normalize_weather_text(weather_text)
- if not weather_text or "日期" not in weather_text:
- return
-
- # 直接使用分段解析方式,因为第一个正则表达式太复杂且容易出错
- # 首先尝试分段解析,按日期分段,然后逐字段提取
- date_pattern = r'日期[::]\s*([\d.\-]+)'
- weather_pattern_simple = r'天气\s+([^\s温度湿度风速风向日期]+)'
- # 温度模式:温度后跟数字,直到遇到"℃"或"湿度"
- temp_pattern = r'温度\s*([0-9.\-]+)[℃°C]?'
- # 湿度模式:湿度后跟数字,直到遇到"%RH"或"风速"
- humidity_pattern = r'湿度\s*([0-9.\-]+)[%RH]?'
- # 风速模式:风速后跟数字和"m/s",支持~符号(如2.1~3.1)
- wind_speed_pattern = r'风速\s*([0-9.~\-]+)\s*m/s'
- # 风向模式:风向后跟方向描述,直到遇到"日期"、"气候条件"、"气候特征"或文本结束
- # 注意:不能排除"风"字,否则"南风"只能匹配到"南"
- # 需要处理"气候条件"或"气候特征"紧跟在风向后的情况(如"西北气候条件"、"西北气候特征")
- # 注意:字符类中不排除空格,因为风向值可能包含空格(如"东偏北"),空格会在前瞻断言中处理
- # 前瞻断言支持有空格或直接遇到关键词的情况
- wind_dir_pattern = r'风向\s*([^日期温度湿度风速气候条件气候特征<>]+?)(?=\s*(?:日期|温度|湿度|风速|气候条件|气候特征|<)|$)'
-
- # 找到所有日期位置,然后为每个日期解析一条记录
- dates = list(re.finditer(date_pattern, weather_text))
- if not dates:
- logger.warning(f"[噪声检测] 未找到日期信息: {weather_text[:100]}")
- return
-
- weather_found = False
- for idx, date_match in enumerate(dates):
- date_start = date_match.start()
- # 找到下一个日期位置或文本末尾
- if idx + 1 < len(dates):
- next_date_match = dates[idx + 1]
- section_end = next_date_match.start()
- else:
- section_end = len(weather_text)
-
- section = weather_text[date_start:section_end]
- logger.debug(f"[噪声检测] 解析天气段落: {section}")
-
- weather = WeatherData()
- weather.monitorAt = date_match.group(1).strip()
-
- # 提取天气(格式:天气 多云 或 天气多云)
- w_match = re.search(weather_pattern_simple, section)
- if w_match:
- weather_value = w_match.group(1).strip()
- # 如果提取到的值不是字段标签,则认为是天气值
- if weather_value and weather_value not in {"温度", "天气", "天气状况", "天气情况"}:
- weather.weather = weather_value
- logger.debug(f"[噪声检测] 提取到天气: {weather.weather}")
-
- # 提取温度(格式:温度26.7-27.4℃ 或 温度 26.7-27.4℃)
- t_match = re.search(temp_pattern, section)
- if t_match:
- temp = t_match.group(1).strip()
- weather.temp = temp
- logger.debug(f"[噪声检测] 提取到温度: {weather.temp}")
-
- # 提取湿度(格式:湿度66.6-67.4%RH 或 湿度 66.6-67.4%RH)
- h_match = re.search(humidity_pattern, section)
- if h_match:
- humidity = h_match.group(1).strip()
- weather.humidity = humidity
- logger.debug(f"[噪声检测] 提取到湿度: {weather.humidity}")
-
- # 提取风速(格式:风速0.9-1.0 m/s 或 风速 0.9-1.0 m/s,支持~符号但输出时转换为-)
- ws_match = re.search(wind_speed_pattern, section)
- if ws_match:
- wind_speed = ws_match.group(1).strip().replace("~", "-")
- weather.windSpeed = wind_speed
- logger.debug(f"[噪声检测] 提取到风速: {weather.windSpeed}")
-
- # 提取风向(格式:风向东北 或 风向 东北)
- # 注意:风向值不应该包含"日期"关键词,如果匹配到包含"日期"的内容,说明匹配错误
- wd_match = re.search(wind_dir_pattern, section)
- if wd_match:
- wind_dir_value = wd_match.group(1).strip()
- # 如果风向值包含"气候条件"或"气候特征",需要截断(处理"西北气候条件"、"西北气候特征"这种情况)
- if "气候条件" in wind_dir_value:
- wind_dir_value = wind_dir_value.split("气候条件")[0].strip()
- if "气候特征" in wind_dir_value:
- wind_dir_value = wind_dir_value.split("气候特征")[0].strip()
- # 验证风向值:不应该包含"日期"、"温度"、"湿度"、"风速"、"气候条件"、"气候特征"等关键词
- if wind_dir_value and "日期" not in wind_dir_value and "温度" not in wind_dir_value and \
- "湿度" not in wind_dir_value and "风速" not in wind_dir_value and \
- "气候条件" not in wind_dir_value and "气候特征" not in wind_dir_value and \
- not wind_dir_value.startswith("日期") and len(wind_dir_value) < 50: # 风向值不应该太长
- weather.windDirection = wind_dir_value
- logger.debug(f"[噪声检测] 提取到风向: {weather.windDirection}")
- else:
- logger.warning(f"[噪声检测] 风向值验证失败,跳过: {wind_dir_value}")
-
- # weather 为空且其它气象字段有任意一个不为空时,默认填入“晴”
- if not weather.weather.strip() and any([weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- weather.weather = "晴"
- _mark_auto_weather_default(record, weather)
-
- # 如果至少有一个字段不为空,则添加这条记录
- if any([weather.monitorAt, weather.weather, weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- record.weather.append(weather)
- weather_found = True
- logger.info(f"[噪声检测] 解析到天气记录: {weather.to_dict()}")
- else:
- logger.warning(f"[噪声检测] 天气记录字段全为空,跳过: {section}")
-
- # 如果分段解析成功,就不需要继续执行后面的代码
- if weather_found:
- return
-
- # 如果分段解析没有成功,尝试其他方式(向后兼容)
- if not weather_found:
- # 尝试分段解析,更精确地匹配格式:日期:2025.9.23 天气 多云 温度26.7-27.4℃湿度66.6-67.4%RH 风速0.9-1.0 m/s 风向东北
- # 注意:格式中字段和值之间可能没有空格,如"温度26.7-27.4℃"、"湿度66.6-67.4%RH"、"风速0.9-1.0 m/s"
- # 需要在遇到单位符号或下一个字段名时停止匹配
- date_pattern = r'日期[::]\s*([\d.\-]+)'
- weather_pattern_simple = r'天气\s+([^\s温度湿度风速风向日期]+)'
- # 温度模式:温度后跟数字和单位,直到遇到"湿度"或其他字段
- temp_pattern = r'温度\s*([0-9.\-]+)[℃°C]?'
- # 湿度模式:湿度后跟数字和单位,直到遇到"风速"或其他字段
- humidity_pattern = r'湿度\s*([0-9.\-]+)[%RH]?'
- # 风速模式:风速后跟数字和单位,直到遇到"风向"或其他字段,支持~符号(如2.1~3.1)
- wind_speed_pattern = r'风速\s*([0-9.~\-]+)\s*m/s'
- # 风向模式:风向后跟方向描述,直到遇到"日期"、"气候条件"或文本结束
- # 注意:不能排除"风"字,否则"南风"只能匹配到"南"
- # 需要处理"气候条件"紧跟在风向后的情况(如"西北气候条件")
- # 注意:字符类中不排除空格,因为风向值可能包含空格,空格会在前瞻断言中处理
- # 前瞻断言支持有空格或直接遇到关键词的情况
- wind_dir_pattern = r'风向\s*([^日期温度湿度风速气候条件<>]+?)(?=\s*(?:日期|温度|湿度|风速|气候条件|<)|$)'
-
- # 找到所有日期位置,然后为每个日期解析一条记录
- dates = list(re.finditer(date_pattern, weather_text))
- if not dates:
- logger.warning(f"[噪声检测] 未找到日期信息: {weather_text[:100]}")
- return
-
- for idx, date_match in enumerate(dates):
- date_start = date_match.start()
- # 找到下一个日期位置或文本末尾
- if idx + 1 < len(dates):
- next_date_match = dates[idx + 1]
- section_end = next_date_match.start()
- else:
- section_end = len(weather_text)
-
- section = weather_text[date_start:section_end]
- logger.debug(f"[噪声检测] 解析天气段落: {section}")
-
- weather = WeatherData()
- weather.monitorAt = date_match.group(1).strip()
-
- # 提取天气(格式:天气 多云 或 天气多云)
- w_match = re.search(weather_pattern_simple, section)
- if w_match:
- weather_value = w_match.group(1).strip()
- if weather_value and weather_value not in {"温度", "天气", "天气状况", "天气情况"}:
- weather.weather = weather_value
- logger.debug(f"[噪声检测] 提取到天气: {weather.weather}")
-
- # 提取温度(格式:温度26.7-27.4℃ 或 温度 26.7-27.4℃)
- t_match = re.search(temp_pattern, section)
- if t_match:
- temp = t_match.group(1).strip()
- weather.temp = temp
- logger.debug(f"[噪声检测] 提取到温度: {weather.temp}")
-
- # 提取湿度(格式:湿度66.6-67.4%RH 或 湿度 66.6-67.4%RH)
- h_match = re.search(humidity_pattern, section)
- if h_match:
- humidity = h_match.group(1).strip()
- weather.humidity = humidity
- logger.debug(f"[噪声检测] 提取到湿度: {weather.humidity}")
-
- # 提取风速(格式:风速0.9-1.0 m/s 或 风速 0.9-1.0 m/s,支持~符号但输出时转换为-)
- ws_match = re.search(wind_speed_pattern, section)
- if ws_match:
- wind_speed = ws_match.group(1).strip().replace("~", "-")
- weather.windSpeed = wind_speed
- logger.debug(f"[噪声检测] 提取到风速: {weather.windSpeed}")
-
- # 提取风向(格式:风向东北 或 风向 东北)
- # 注意:风向值不应该包含"日期"关键词,如果匹配到包含"日期"的内容,说明匹配错误
- wd_match = re.search(wind_dir_pattern, section)
- if wd_match:
- wind_dir_value = wd_match.group(1).strip()
- # 如果风向值包含"气候条件",需要截断(处理"西北气候条件"这种情况)
- if "气候条件" in wind_dir_value:
- wind_dir_value = wind_dir_value.split("气候条件")[0].strip()
- # 验证风向值:不应该包含"日期"、"温度"、"湿度"、"风速"、"气候条件"等关键词
- if wind_dir_value and "日期" not in wind_dir_value and "温度" not in wind_dir_value and \
- "湿度" not in wind_dir_value and "风速" not in wind_dir_value and \
- "气候条件" not in wind_dir_value and \
- not wind_dir_value.startswith("日期") and len(wind_dir_value) < 50: # 风向值不应该太长
- weather.windDirection = wind_dir_value
- logger.debug(f"[噪声检测] 提取到风向: {weather.windDirection}")
- else:
- logger.warning(f"[噪声检测] 风向值验证失败,跳过: {wind_dir_value}")
-
- # weather 为空且其它气象字段有任意一个不为空时,默认填入"晴"
- if not weather.weather.strip() and any([weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- weather.weather = "晴"
- _mark_auto_weather_default(record, weather)
-
- # 如果至少有一个字段不为空,则添加这条记录
- if any([weather.monitorAt, weather.weather, weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- record.weather.append(weather)
- logger.info(f"[噪声检测] 解析到天气记录(简化模式): {weather.to_dict()}")
- else:
- logger.warning(f"[噪声检测] 天气记录字段全为空,跳过: {section}")
- def parse_header_from_combined_cell(cell_text: str) -> dict:
- """从组合单元格中解析头部信息
-
- 单元格格式示例:
- 项目名称:武汉黄陂路102号南站改造工程竣工验收检测依据:GB 12348-2008 □GB3096-2008 □其他:声级计型号/编号:AY2201 声校准器型号/编号:AY2204 检测前校准值:93.8 dB(A) 检测后校准值:94.0 dB(A)气象条件...
- """
- result = {
- "project": "",
- "standardReferences": "",
- "soundLevelMeterMode": "",
- "soundCalibratorMode": "",
- "calibrationValueBefore": "",
- "calibrationValueAfter": ""
- }
-
- if not cell_text:
- return result
-
- # 检查是否包含任何需要解析的字段
- has_any_field = any(keyword in cell_text for keyword in [
- "项目名称", "检测依据", "监测依据", "检查依据", "声级计型号", "声校准器型号",
- "检测前校准值", "检测后校准值", "声纹计型号", "声级计校准器型号"
- ])
- if not has_any_field:
- return result
-
- # 解析项目名称:项目名称:xxx(后面跟着检测依据或其他字段,可能没有分隔符)
- # 匹配模式:项目名称:xxx(直到检测依据、监测依据、声级计、声校准器、检测前、检测后或气象条件,或到字符串末尾)
- # 注意:项目名称后面可能直接跟着"检测依据"没有分隔符,也可能后面没有其他字段
- project_match = re.search(r'项目名称[::](.+?)(?:检测依据|监测依据|检查依据|声级计|声校准器|检测前|检测后|气象条件|</td>|</tr>|$)', cell_text)
- if project_match:
- result["project"] = project_match.group(1).strip()
- # 如果提取到的项目名称为空,可能是正则表达式匹配到了但内容为空
- if not result["project"]:
- # 尝试更简单的匹配:项目名称:后面直到行尾或换行
- project_match2 = re.search(r'项目名称[::]([^<]+?)(?:</td>|</tr>|$)', cell_text)
- if project_match2:
- result["project"] = project_match2.group(1).strip()
- # 清理project字段,删除"检测依据"及之后的内容(防止正则表达式没有完全匹配到的情况)
- result["project"] = clean_project_field(result["project"])
-
- # 解析检测依据:检测依据:GB 12348-2008 □GB3096-2008 □其他:声级计...
- # 也可能格式为:检测依据:xxx或监测依据:xxx
- # 注意:检测依据后面可能跟着"□其他:",需要截断到"声级计"或"声校准器"或"检测前"或"检测后"或"气象条件"
- standard_match = re.search(r'(?:检测|监测|检查)依据[::](.+?)(?:声级计|声校准器|检测前|检测后|气象条件)', cell_text)
- if standard_match:
- standard_text = extract_standard_references(standard_match.group(1))
- if standard_text:
- result["standardReferences"] = standard_text
- else:
- # 如果第一个正则没有匹配到,尝试更宽松的匹配:匹配到行尾或下一个字段
- standard_match2 = re.search(r'(?:检测|监测|检查)依据[::]([^声级计声校准器检测前检测后气象条件]+?)(?:声级计|声校准器|检测前|检测后|气象条件|$)', cell_text)
- if standard_match2:
- standard_text = extract_standard_references(standard_match2.group(1))
- if standard_text:
- result["standardReferences"] = standard_text
-
- # 解析声级计型号/编号:声级计型号/编号:AY2201 或 声级计型号:AY2201 或 声级计型号/编号:AWA628+/AY2249
- # 支持包含+号和斜杠的型号,如 AWA628+/AY2249
- sound_meter_match = re.search(r'声级计型号[/::]?(?:编号)?[::]\s*([A-Z0-9+/]+)', cell_text)
- if sound_meter_match:
- result["soundLevelMeterMode"] = sound_meter_match.group(1).strip()
-
- # 解析声校准器型号/编号:声校准器型号/编号:AY2204 或 声校准器型号:AY2204
- # 支持包含+号和斜杠的型号
- calibrator_match = re.search(r'声校准器型号[/::]?(?:编号)?[::]\s*([A-Z0-9+/]+)', cell_text)
- if calibrator_match:
- result["soundCalibratorMode"] = calibrator_match.group(1).strip()
-
- # 解析检测前校准值:检测前校准值:93.8 dB(A)
- before_cal_match = re.search(r'检测前校准值[::]\s*([0-9.]+)\s*dB\(A\)', cell_text)
- if before_cal_match:
- cal_value = before_cal_match.group(1).strip()
- result["calibrationValueBefore"] = f"{cal_value} dB(A)"
- else:
- # 如果没有单位,只提取数值
- before_cal_match2 = re.search(r'检测前校准值[::]\s*([0-9.]+)', cell_text)
- if before_cal_match2:
- result["calibrationValueBefore"] = before_cal_match2.group(1).strip()
-
- # 解析检测后校准值:检测后校准值:94.0 dB(A)
- after_cal_match = re.search(r'检测后校准值[::]\s*([0-9.]+)\s*dB\(A\)', cell_text)
- if after_cal_match:
- cal_value = after_cal_match.group(1).strip()
- result["calibrationValueAfter"] = f"{cal_value} dB(A)"
- else:
- # 如果没有单位,只提取数值
- after_cal_match2 = re.search(r'检测后校准值[::]\s*([0-9.]+)', cell_text)
- if after_cal_match2:
- result["calibrationValueAfter"] = after_cal_match2.group(1).strip()
-
- return result
- def parse_noise_detection_record(markdown_content: str, first_page_image: Optional = None, output_dir: Optional[str] = None) -> NoiseDetectionRecord:
- """解析噪声检测记录 - v2版本不依赖OCR,只从markdown内容解析"""
- record = NoiseDetectionRecord()
-
- # 首先提取Markdown关键词补充(优先级高)
- md_keywords_comment_match = re.search(r'<!--\s*Markdown关键词补充:(.*?)-->', markdown_content, re.DOTALL)
- if md_keywords_comment_match:
- keywords_text = md_keywords_comment_match.group(1)
- logger.info("[噪声检测] 发现Markdown关键词补充,开始提取(优先级高)")
-
- # 提取项目名称
- project_match = re.search(r'项目名称[::]([^\n]+)', keywords_text)
- if project_match:
- record.project = clean_project_field(project_match.group(1).strip())
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到项目名称: {record.project}")
-
- # 提取检测依据
- standard_match = re.search(r'(?:检测|监测|检查)依据[::]([^\n]+)', keywords_text)
- if standard_match:
- record.standardReferences = extract_standard_references(standard_match.group(1))
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到检测依据: {record.standardReferences}")
-
- # 提取声级计型号/编号
- sound_meter_match = re.search(r'声级计型号/编号[::]([^\n]+)', keywords_text)
- if sound_meter_match:
- record.soundLevelMeterMode = sound_meter_match.group(1).strip()
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到声级计型号: {record.soundLevelMeterMode}")
-
- # 提取声校准器型号/编号
- calibrator_match = re.search(r'声校准器型号/编号[::]([^\n]+)', keywords_text)
- if calibrator_match:
- record.soundCalibratorMode = calibrator_match.group(1).strip()
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到声校准器型号: {record.soundCalibratorMode}")
-
- # 提取检测前校准值
- before_cal_match = re.search(r'检测前校准值[::]([^\n]+)', keywords_text)
- if before_cal_match:
- record.calibrationValueBefore = before_cal_match.group(1).strip()
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到检测前校准值: {record.calibrationValueBefore}")
-
- # 提取检测后校准值
- after_cal_match = re.search(r'检测后校准值[::]([^\n]+)', keywords_text)
- if after_cal_match:
- record.calibrationValueAfter = after_cal_match.group(1).strip()
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到检测后校准值: {record.calibrationValueAfter}")
-
- # 提取天气信息
- weather_lines = re.findall(r'日期[::]([^\n]+)', keywords_text)
- for weather_line in weather_lines:
- weather = WeatherData()
- # 解析天气行:日期:xxx 天气:xxx 温度:xxx 湿度:xxx 风速:xxx 风向:xxx
- date_match = re.search(r'日期[::]\s*([\d.\-]+)', weather_line)
- if date_match:
- weather.monitorAt = date_match.group(1).strip()
-
- weather_match = re.search(r'天气[::]\s*([^\s温度]+)', weather_line)
- if weather_match:
- weather.weather = weather_match.group(1).strip()
-
- temp_match = re.search(r'温度[::]\s*([0-9.\-]+)', weather_line)
- if temp_match:
- weather.temp = temp_match.group(1).strip()
-
- humidity_match = re.search(r'湿度[::]\s*([0-9.\-]+)', weather_line)
- if humidity_match:
- weather.humidity = humidity_match.group(1).strip()
-
- wind_speed_match = re.search(r'风速[::]\s*([0-9.~\-]+)', weather_line)
- if wind_speed_match:
- weather.windSpeed = wind_speed_match.group(1).strip().replace("~", "-")
-
- # 注意:不能排除"风"字,否则"南风"只能匹配到"南"
- # 使用非贪婪匹配,匹配到下一个字段名或行尾,需要处理"气候条件"
- wind_dir_match = re.search(r'风向[::]\s*([^\s日期温度湿度气候条件]+?)(?=\s*(?:日期|温度|湿度|风速|气候条件)|$)', weather_line)
- if wind_dir_match:
- wind_dir_value = wind_dir_match.group(1).strip()
- # 如果风向值包含"气候条件",需要截断(处理"西北气候条件"这种情况)
- if "气候条件" in wind_dir_value:
- wind_dir_value = wind_dir_value.split("气候条件")[0].strip()
- # 验证风向值:不应该包含"日期"、"温度"、"湿度"、"风速"、"气候条件"等关键词
- if wind_dir_value and "日期" not in wind_dir_value and "温度" not in wind_dir_value and \
- "湿度" not in wind_dir_value and "风速" not in wind_dir_value and \
- "气候条件" not in wind_dir_value and \
- not wind_dir_value.startswith("日期") and len(wind_dir_value) < 50: # 风向值不应该太长
- weather.windDirection = wind_dir_value
- else:
- logger.warning(f"[噪声检测] 风向值验证失败,跳过: {wind_dir_value}")
-
- # 如果天气为空但其他字段有值,默认为"晴"
- if not weather.weather or not weather.weather.strip():
- if any([weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- weather.weather = "晴"
- _mark_auto_weather_default(record, weather)
- _mark_auto_weather_default(record, weather)
-
- # 如果至少有一个字段不为空,添加到记录(即使monitorAt为空也先添加,后续会从表格中补充)
- if any([weather.weather, weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- record.weather.append(weather)
- logger.debug(f"[噪声检测] 从Markdown关键词补充提取到天气信息: {weather.to_dict()}")
-
- # 然后提取OCR关键词补充(优先级低,只在字段为空时补充)
- ocr_keywords_comment_match = re.search(r'<!--\s*OCR关键词补充:(.*?)-->', markdown_content, re.DOTALL)
- if ocr_keywords_comment_match:
- keywords_text = ocr_keywords_comment_match.group(1)
- logger.info("[噪声检测] 发现OCR关键词补充,开始提取(优先级低,仅在字段为空时补充)")
-
- # 提取项目名称(仅在字段为空时)
- project_match = re.search(r'项目名称[::]([^\n]+)', keywords_text)
- if project_match and (not record.project or not record.project.strip()):
- record.project = clean_project_field(project_match.group(1).strip())
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到项目名称: {record.project}")
-
- # 提取检测依据(仅在字段为空时)
- standard_match = re.search(r'(?:检测|监测|检查)依据[::]([^\n]+)', keywords_text)
- if standard_match and (not record.standardReferences or not record.standardReferences.strip()):
- record.standardReferences = extract_standard_references(standard_match.group(1))
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到检测依据: {record.standardReferences}")
-
- # 提取声级计型号/编号(仅在字段为空时)
- sound_meter_match = re.search(r'声级计型号/编号[::]([^\n]+)', keywords_text)
- if sound_meter_match and (not record.soundLevelMeterMode or not record.soundLevelMeterMode.strip()):
- record.soundLevelMeterMode = sound_meter_match.group(1).strip()
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到声级计型号: {record.soundLevelMeterMode}")
-
- # 提取声校准器型号/编号(仅在字段为空时)
- calibrator_match = re.search(r'声校准器型号/编号[::]([^\n]+)', keywords_text)
- if calibrator_match and (not record.soundCalibratorMode or not record.soundCalibratorMode.strip()):
- record.soundCalibratorMode = calibrator_match.group(1).strip()
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到声校准器型号: {record.soundCalibratorMode}")
-
- # 提取检测前校准值(仅在字段为空时)
- before_cal_match = re.search(r'检测前校准值[::]([^\n]+)', keywords_text)
- if before_cal_match and (not record.calibrationValueBefore or not record.calibrationValueBefore.strip()):
- record.calibrationValueBefore = before_cal_match.group(1).strip()
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到检测前校准值: {record.calibrationValueBefore}")
-
- # 提取检测后校准值(仅在字段为空时)
- after_cal_match = re.search(r'检测后校准值[::]([^\n]+)', keywords_text)
- if after_cal_match and (not record.calibrationValueAfter or not record.calibrationValueAfter.strip()):
- record.calibrationValueAfter = after_cal_match.group(1).strip()
- logger.debug(f"[噪声检测] 从OCR关键词补充提取到检测后校准值: {record.calibrationValueAfter}")
-
- # 提取天气信息(仅在MD天气信息中没有对应日期或字段为空时补充)
- weather_lines = re.findall(r'日期[::]([^\n]+)', keywords_text)
- for weather_line in weather_lines:
- ocr_weather = WeatherData()
- # 解析天气行:日期:xxx 天气:xxx 温度:xxx 湿度:xxx 风速:xxx 风向:xxx
- date_match = re.search(r'日期[::]\s*([\d.\-]+)', weather_line)
- if date_match:
- ocr_weather.monitorAt = date_match.group(1).strip()
-
- weather_match = re.search(r'天气[::]\s*([^\s温度]+)', weather_line)
- if weather_match:
- ocr_weather.weather = weather_match.group(1).strip()
-
- temp_match = re.search(r'温度[::]\s*([0-9.\-]+)', weather_line)
- if temp_match:
- ocr_weather.temp = temp_match.group(1).strip()
-
- humidity_match = re.search(r'湿度[::]\s*([0-9.\-]+)', weather_line)
- if humidity_match:
- ocr_weather.humidity = humidity_match.group(1).strip()
-
- wind_speed_match = re.search(r'风速[::]\s*([0-9.~\-]+)', weather_line)
- if wind_speed_match:
- ocr_weather.windSpeed = wind_speed_match.group(1).strip().replace("~", "-")
-
- # 注意:不能排除"风"字,否则"南风"只能匹配到"南"
- # 使用非贪婪匹配,匹配到下一个字段名或行尾,需要处理"气候条件"
- wind_dir_match = re.search(r'风向[::]\s*([^\s日期温度湿度气候条件]+?)(?=\s*(?:日期|温度|湿度|风速|气候条件)|$)', weather_line)
- if wind_dir_match:
- wind_dir_value = wind_dir_match.group(1).strip()
- # 如果风向值包含"气候条件",需要截断(处理"西北气候条件"这种情况)
- if "气候条件" in wind_dir_value:
- wind_dir_value = wind_dir_value.split("气候条件")[0].strip()
- # 验证风向值:不应该包含"日期"、"温度"、"湿度"、"风速"、"气候条件"等关键词
- if wind_dir_value and "日期" not in wind_dir_value and "温度" not in wind_dir_value and \
- "湿度" not in wind_dir_value and "风速" not in wind_dir_value and \
- "气候条件" not in wind_dir_value and \
- not wind_dir_value.startswith("日期") and len(wind_dir_value) < 50: # 风向值不应该太长
- ocr_weather.windDirection = wind_dir_value
- else:
- logger.warning(f"[噪声检测] 风向值验证失败,跳过: {wind_dir_value}")
-
- # 如果天气为空但其他字段有值,默认为"晴"
- if not ocr_weather.weather or not ocr_weather.weather.strip():
- if any([ocr_weather.temp, ocr_weather.humidity, ocr_weather.windSpeed, ocr_weather.windDirection]):
- ocr_weather.weather = "晴"
- _mark_auto_weather_default(record, ocr_weather)
-
- # 检查是否已存在相同日期的MD天气记录
- if ocr_weather.monitorAt:
- ocr_date = ocr_weather.monitorAt.strip().rstrip('.')
- found_md_weather = None
- for md_weather in record.weather:
- md_date = md_weather.monitorAt.strip().rstrip('.') if md_weather.monitorAt else ""
- if md_date == ocr_date:
- found_md_weather = md_weather
- break
-
- if found_md_weather:
- # 如果找到MD天气记录,只在字段为空时用OCR补充
- if not found_md_weather.weather and ocr_weather.weather:
- found_md_weather.weather = ocr_weather.weather
- logger.debug(f"[噪声检测] 从OCR补充MD天气记录的天气字段: {found_md_weather.weather}")
- if not found_md_weather.temp and ocr_weather.temp:
- found_md_weather.temp = ocr_weather.temp
- logger.debug(f"[噪声检测] 从OCR补充MD天气记录的温度字段: {found_md_weather.temp}")
- if not found_md_weather.humidity and ocr_weather.humidity:
- found_md_weather.humidity = ocr_weather.humidity
- logger.debug(f"[噪声检测] 从OCR补充MD天气记录的湿度字段: {found_md_weather.humidity}")
- if not found_md_weather.windSpeed and ocr_weather.windSpeed:
- found_md_weather.windSpeed = ocr_weather.windSpeed
- logger.debug(f"[噪声检测] 从OCR补充MD天气记录的风速字段: {found_md_weather.windSpeed}")
- if not found_md_weather.windDirection and ocr_weather.windDirection:
- found_md_weather.windDirection = ocr_weather.windDirection
- logger.debug(f"[噪声检测] 从OCR补充MD天气记录的风向字段: {found_md_weather.windDirection}")
- else:
- # 如果没有找到MD天气记录,且OCR天气信息有值,则添加OCR天气记录
- if any([ocr_weather.weather, ocr_weather.temp, ocr_weather.humidity, ocr_weather.windSpeed, ocr_weather.windDirection]):
- record.weather.append(ocr_weather)
- logger.debug(f"[噪声检测] 从OCR关键词补充添加天气信息(MD中无对应日期): {ocr_weather.to_dict()}")
- else:
- # 如果OCR天气信息没有日期,但有其他字段,也添加(后续会从表格中补充日期)
- if any([ocr_weather.weather, ocr_weather.temp, ocr_weather.humidity, ocr_weather.windSpeed, ocr_weather.windDirection]):
- record.weather.append(ocr_weather)
- logger.debug(f"[噪声检测] 从OCR关键词补充添加天气信息(无日期): {ocr_weather.to_dict()}")
-
- # 保存OCR提取的天气信息(用于后续与表格解析结果合并)
- ocr_weather_list = record.weather.copy() if record.weather else []
- # 清空record.weather,让表格解析重新填充
- record.weather = []
-
- # 使用支持rowspan和colspan的函数提取表格,因为噪声检测表有复杂的表头结构
- tables = extract_table_with_rowspan_colspan(markdown_content)
-
- if not tables:
- logger.warning(f"[噪声检测] 未能提取出任何表格内容")
- return record
- first_table = tables[0]
-
- # 首先尝试从组合单元格中解析头部信息(这种情况是多个字段都在一个单元格中,或者单个字段也在同一单元格中)
- # 同时也支持字段名和值在不同单元格的情况(新格式)
- header_extracted = False
- weather_extracted = False
- for row_idx, row in enumerate(first_table):
- # 先尝试从同一单元格解析(旧格式)
- for cell in row:
- # 检查单元格是否包含头部字段的关键词(放宽条件,支持单个字段的情况)
- # 如果单元格包含字段名和冒号,说明值就在同一单元格中
- has_header_field = any(keyword in cell for keyword in [
- "项目名称", "检测依据", "监测依据", "检查依据", "声级计型号", "声校准器型号",
- "检测前校准值", "检测后校准值", "声纹计型号", "声级计校准器型号"
- ])
- has_colon = ":" in cell or ":" in cell
-
- if has_header_field and has_colon:
- logger.debug(f"[噪声检测] 发现包含字段信息的单元格,尝试解析: {cell[:100]}...")
- # 清理HTML标签,只保留文本内容
- cell_clean = re.sub(r'<[^>]+>', '', cell).strip()
- parsed_header = parse_header_from_combined_cell(cell_clean)
-
- # 更新字段(如果解析到值)
- if parsed_header["project"] and not record.project:
- record.project = clean_project_field(parsed_header["project"])
- header_extracted = True
- logger.debug(f"[噪声检测] 从单元格解析到项目名称: {record.project}")
- if parsed_header["standardReferences"] and not record.standardReferences:
- record.standardReferences = parsed_header["standardReferences"]
- logger.debug(f"[噪声检测] 从单元格解析到检测依据: {record.standardReferences}")
- if parsed_header["soundLevelMeterMode"] and not record.soundLevelMeterMode:
- record.soundLevelMeterMode = parsed_header["soundLevelMeterMode"]
- logger.debug(f"[噪声检测] 从单元格解析到声级计型号: {record.soundLevelMeterMode}")
- if parsed_header["soundCalibratorMode"] and not record.soundCalibratorMode:
- record.soundCalibratorMode = parsed_header["soundCalibratorMode"]
- logger.debug(f"[噪声检测] 从单元格解析到声校准器型号: {record.soundCalibratorMode}")
- if parsed_header["calibrationValueBefore"] and not record.calibrationValueBefore:
- record.calibrationValueBefore = parsed_header["calibrationValueBefore"]
- logger.debug(f"[噪声检测] 从单元格解析到检测前校准值: {record.calibrationValueBefore}")
- if parsed_header["calibrationValueAfter"] and not record.calibrationValueAfter:
- record.calibrationValueAfter = parsed_header["calibrationValueAfter"]
- logger.debug(f"[噪声检测] 从单元格解析到检测后校准值: {record.calibrationValueAfter}")
-
- # 如果单元格中包含气象条件,也从这里解析
- if "气象条件" in cell:
- weather_text = cell
- # 从"气象条件"之后的内容开始解析
- if "气象条件" in weather_text:
- # 提取气象条件部分(从"气象条件"开始到字符串末尾或下一个主要字段)
- weather_section = weather_text.split("气象条件")[-1] if "气象条件" in weather_text else weather_text
- parse_weather_from_text(weather_section, record)
- weather_extracted = True
- logger.info(f"[噪声检测] 从组合单元格解析到天气信息: {len(record.weather)} 条记录")
-
- # 尝试从不同单元格解析(新格式:字段名和值在不同单元格)
- for col_idx, cell in enumerate(row):
- cell_clean = re.sub(r'<[^>]+>', '', cell).strip()
-
- # 声级计型号/编号:在单元格中,值在下一列
- if "声级计型号" in cell_clean and (":" in cell_clean or ":" in cell_clean) and not record.soundLevelMeterMode:
- if col_idx + 1 < len(row) and row[col_idx + 1].strip():
- record.soundLevelMeterMode = row[col_idx + 1].strip()
- header_extracted = True
- logger.debug(f"[噪声检测] 从不同单元格解析到声级计型号: {record.soundLevelMeterMode}")
-
- # 声校准器型号/编号:在单元格中,值在下一列
- if "声校准器型号" in cell_clean and (":" in cell_clean or ":" in cell_clean) and not record.soundCalibratorMode:
- if col_idx + 1 < len(row) and row[col_idx + 1].strip():
- record.soundCalibratorMode = row[col_idx + 1].strip()
- header_extracted = True
- logger.debug(f"[噪声检测] 从不同单元格解析到声校准器型号: {record.soundCalibratorMode}")
-
- # 检测前校准值:在单元格中,值在下一列
- if "检测前校准值" in cell_clean and (":" in cell_clean or ":" in cell_clean) and not record.calibrationValueBefore:
- if col_idx + 1 < len(row) and row[col_idx + 1].strip():
- cal_value = row[col_idx + 1].strip()
- # 如果包含单位,保留;否则添加单位
- if "dB" in cal_value or "dB(A)" in cal_value or "dB(A)" in cal_value:
- record.calibrationValueBefore = cal_value
- else:
- record.calibrationValueBefore = f"{cal_value} dB(A)"
- header_extracted = True
- logger.debug(f"[噪声检测] 从不同单元格解析到检测前校准值: {record.calibrationValueBefore}")
-
- # 检测后校准值:在单元格中,值在下一列
- if "检测后校准值" in cell_clean and (":" in cell_clean or ":" in cell_clean) and not record.calibrationValueAfter:
- if col_idx + 1 < len(row) and row[col_idx + 1].strip():
- cal_value = row[col_idx + 1].strip()
- # 如果包含单位,保留;否则添加单位
- if "dB" in cal_value or "dB(A)" in cal_value or "dB(A)" in cal_value:
- record.calibrationValueAfter = cal_value
- else:
- record.calibrationValueAfter = f"{cal_value} dB(A)"
- header_extracted = True
- logger.debug(f"[噪声检测] 从不同单元格解析到检测后校准值: {record.calibrationValueAfter}")
-
- # 如果已经解析到所有需要的字段,可以提前结束
- if record.project and record.soundLevelMeterMode and record.calibrationValueBefore and record.calibrationValueAfter:
- logger.info(f"[噪声检测] 从单元格成功解析到所有头部信息: project={record.project}, "
- f"soundLevelMeterMode={record.soundLevelMeterMode}, "
- f"calibrationValueBefore={record.calibrationValueBefore}, "
- f"calibrationValueAfter={record.calibrationValueAfter}")
- if not weather_extracted:
- break
-
- # 如果还没有提取到头部信息,使用原来的方法(假设字段分布在不同的单元格中)
- # 但也要尝试从同一单元格中提取(如果单元格包含字段名和冒号)
- if not header_extracted:
- for row in first_table:
- logger.debug(f"[噪声检测][ROW] len={len(row)}, content={row}")
- for i, cell in enumerate(row):
- # 尝试从同一单元格中提取项目名称(如果包含冒号)
- if "项目名称" in cell and (":" in cell or ":" in cell) and not record.project:
- # 使用 parse_header_from_combined_cell 解析
- parsed = parse_header_from_combined_cell(cell)
- if parsed["project"]:
- record.project = clean_project_field(parsed["project"])
- header_extracted = True
- logger.debug(f"[噪声检测] 从单元格 {i} 解析到项目名称: {record.project}")
- break
-
- # 如果同一单元格没有值,尝试从下一个单元格获取(向后兼容)
- if "项目名称" in cell and i + 1 < len(row) and not record.project:
- # 检查下一个单元格是否有内容
- if row[i + 1].strip():
- record.project = clean_project_field(row[i + 1].strip())
- if not record.project.strip():
- logger.error(f"[噪声检测] 项目名称 为空,行数据: {row}")
- else:
- header_extracted = True
- logger.debug(f"[噪声检测] 从单元格 {i+1} 解析到项目名称: {record.project}")
- break
- if any(k in row[0] for k in ["检测依据", "监测依据", "检查依据"]):
- for i, cell in enumerate(row):
- if any(k in cell for k in ["检测依据", "监测依据", "检查依据"]) and i + 1 < len(row):
- candidate_standard = extract_standard_references(row[i + 1])
- if candidate_standard:
- record.standardReferences = candidate_standard
- logger.debug(f"[噪声检测] 从行数据解析到检测依据: {record.standardReferences}")
- else:
- logger.error(f"[噪声检测] 检测/监测依据 为空或无法解析,行数据: {row}")
- break
- # 尝试从同一单元格或下一个单元格提取声级计型号
- for i, cell in enumerate(row):
- if any(k in cell for k in ["声纹计型号", "声级计型号"]) and not record.soundLevelMeterMode:
- # 先尝试从同一单元格提取(如果包含冒号)
- if (":" in cell or ":" in cell):
- parsed = parse_header_from_combined_cell(cell)
- if parsed["soundLevelMeterMode"]:
- record.soundLevelMeterMode = parsed["soundLevelMeterMode"]
- logger.debug(f"[噪声检测] 从单元格 {i} 解析到声级计型号: {record.soundLevelMeterMode}")
- break
- # 如果同一单元格没有值,尝试从下一个单元格获取
- elif i + 1 < len(row) and row[i + 1].strip():
- record.soundLevelMeterMode = row[i + 1].strip()
- if not record.soundLevelMeterMode.strip():
- logger.error(f"[噪声检测] 声级计型号 为空,行数据: {row}")
- else:
- logger.debug(f"[噪声检测] 从单元格 {i+1} 解析到声级计型号: {record.soundLevelMeterMode}")
- break
-
- # 尝试从同一单元格或下一个单元格提取声校准器型号
- for i, cell in enumerate(row):
- if any(k in cell for k in ["声纹准器型号", "声校准器型号", "声级计校准器型号"]) and not record.soundCalibratorMode:
- # 先尝试从同一单元格提取(如果包含冒号)
- if (":" in cell or ":" in cell):
- parsed = parse_header_from_combined_cell(cell)
- if parsed["soundCalibratorMode"]:
- record.soundCalibratorMode = parsed["soundCalibratorMode"]
- logger.debug(f"[噪声检测] 从单元格 {i} 解析到声校准器型号: {record.soundCalibratorMode}")
- break
- # 如果同一单元格没有值,尝试从下一个单元格获取
- elif i + 1 < len(row) and row[i + 1].strip():
- record.soundCalibratorMode = row[i + 1].strip()
- if not record.soundCalibratorMode.strip():
- logger.error(f"[噪声检测] 声级计校准器型号 为空,行数据: {row}")
- else:
- logger.debug(f"[噪声检测] 从单元格 {i+1} 解析到声校准器型号: {record.soundCalibratorMode}")
- break
-
- # 尝试从同一单元格或下一个单元格提取检测前校准值
- for i, cell in enumerate(row):
- if "检测前校准值" in cell and not record.calibrationValueBefore:
- # 先尝试从同一单元格提取(如果包含冒号)
- if (":" in cell or ":" in cell):
- parsed = parse_header_from_combined_cell(cell)
- if parsed["calibrationValueBefore"]:
- record.calibrationValueBefore = parsed["calibrationValueBefore"]
- logger.debug(f"[噪声检测] 从单元格 {i} 解析到检测前校准值: {record.calibrationValueBefore}")
- break
- # 如果同一单元格没有值,尝试从下一个单元格获取
- elif i + 1 < len(row) and row[i + 1].strip():
- record.calibrationValueBefore = row[i + 1].strip()
- if not record.calibrationValueBefore.strip():
- logger.error(f"[噪声检测] 检测前校准值 为空,行数据: {row}")
- else:
- logger.debug(f"[噪声检测] 从单元格 {i+1} 解析到检测前校准值: {record.calibrationValueBefore}")
- break
-
- # 尝试从同一单元格或下一个单元格提取检测后校准值
- for i, cell in enumerate(row):
- if "检测后校准值" in cell and not record.calibrationValueAfter:
- # 先尝试从同一单元格提取(如果包含冒号)
- if (":" in cell or ":" in cell):
- parsed = parse_header_from_combined_cell(cell)
- if parsed["calibrationValueAfter"]:
- record.calibrationValueAfter = parsed["calibrationValueAfter"]
- logger.debug(f"[噪声检测] 从单元格 {i} 解析到检测后校准值: {record.calibrationValueAfter}")
- break
- # 如果同一单元格没有值,尝试从下一个单元格获取
- elif i + 1 < len(row) and row[i + 1].strip():
- record.calibrationValueAfter = row[i + 1].strip()
- if not record.calibrationValueAfter.strip():
- logger.error(f"[噪声检测] 检测后校准值 为空,行数据: {row}")
- else:
- logger.debug(f"[噪声检测] 从单元格 {i+1} 解析到检测后校准值: {record.calibrationValueAfter}")
- break
- # 解析气象条件 - 支持多条记录(如果还没有从组合单元格中提取到天气数据)
- if not weather_extracted:
- # 首先尝试从表格结构中解析(气象条件在第一列,日期在第二列,天气在第三列等)
- for row_idx, row in enumerate(first_table):
- if len(row) < 2:
- continue
-
- # 检查是否是气象条件行(第一列包含"气象条件")
- if "气象条件" in row[0]:
- # 尝试从表格单元格中解析天气信息
- # 格式:气象条件 | 日期:xxx | 天气 | 温度 | xxx | ...
- # 或者:气象条件 | 日期:xxx | 天气 | 多云 | 温度 | xxx | ...
-
- # 找到日期列(包含"日期"的列)
- date_col_idx = -1
- weather_col_idx = -1
- temp_col_idx = -1
-
- for col_idx, cell in enumerate(row):
- if "日期" in cell and ":" in cell:
- date_col_idx = col_idx
- elif cell.strip() == "天气" or "天气" in cell:
- weather_col_idx = col_idx
- elif cell.strip() == "温度" or "温度" in cell:
- temp_col_idx = col_idx
-
- # 如果找到日期列,尝试解析多行天气数据
- if date_col_idx >= 0:
- # 从当前行开始,查找所有包含日期的行
- for check_row_idx in range(row_idx, min(row_idx + 5, len(first_table))): # 最多检查5行
- check_row = first_table[check_row_idx]
- if len(check_row) <= date_col_idx:
- continue
-
- date_cell = check_row[date_col_idx]
- # 检查是否包含日期
- date_match = re.search(r'日期[::]\s*([\d.\-]+)', date_cell)
- if not date_match:
- continue
-
- weather = WeatherData()
- weather.monitorAt = date_match.group(1).strip()
-
- # 在当前行中重新查找列索引(因为不同行的列结构可能不同)
- current_weather_col_idx = -1
- current_temp_col_idx = -1
- for col_idx, cell in enumerate(check_row):
- if cell.strip() == "天气" or "天气" in cell:
- current_weather_col_idx = col_idx
- elif cell.strip() == "温度" or "温度" in cell:
- current_temp_col_idx = col_idx
-
- # 提取天气(在日期列之后查找)
- # 天气值在"天气"标签的下一列(如果下一列不是"温度")
- if current_weather_col_idx >= 0 and len(check_row) > current_weather_col_idx + 1:
- weather_value = check_row[current_weather_col_idx + 1].strip()
- # 如果下一列是天气值(不是"天气"标签,不是"温度"标签,且不是数字),则使用
- if weather_value and weather_value != "天气" and weather_value != "温度" and not re.match(r'^[\d.\-]+$', weather_value):
- weather.weather = weather_value
- else:
- # 尝试从日期列之后查找"天气"标签,然后取下一列
- for col_idx in range(date_col_idx + 1, min(date_col_idx + 5, len(check_row))):
- cell = check_row[col_idx].strip()
- if cell == "天气" and col_idx + 1 < len(check_row):
- # 找到"天气"标签,取下一列的值
- next_cell = check_row[col_idx + 1].strip()
- if next_cell and next_cell != "天气" and next_cell != "温度" and not re.match(r'^[\d.\-]+$', next_cell):
- weather.weather = next_cell
- break
- elif cell and cell != "天气" and cell != "温度" and not re.match(r'^[\d.\-]+$', cell) and col_idx == date_col_idx + 1:
- # 日期列之后的第一列可能是天气值(如果格式正确)
- weather.weather = cell
- break
-
- # 提取温度
- # 温度值在"温度"标签的下一列
- if current_temp_col_idx >= 0 and len(check_row) > current_temp_col_idx + 1:
- temp_value = check_row[current_temp_col_idx + 1].strip()
- # 如果下一列是温度值(包含数字和-),则使用
- if temp_value and re.match(r'[\d.\-]+', temp_value):
- weather.temp = temp_value
- else:
- # 尝试从日期列之后查找"温度"标签,然后取下一列
- for col_idx in range(date_col_idx + 1, min(date_col_idx + 6, len(check_row))):
- cell = check_row[col_idx].strip()
- if cell == "温度" and col_idx + 1 < len(check_row):
- # 找到"温度"标签,取下一列的值
- temp_value = check_row[col_idx + 1].strip()
- if temp_value and re.match(r'[\d.\-]+', temp_value):
- weather.temp = temp_value
- break
- elif "℃" in cell or (re.match(r'[\d.\-]+', cell) and "温度" not in cell):
- # 如果直接找到温度值(包含℃或数字),也使用
- weather.temp = cell.replace("℃", "").strip()
- break
-
- # 提取湿度
- # 注意:新格式中"℃ 湿度"可能在同一个单元格中
- for col_idx, cell in enumerate(check_row):
- if "湿度" in cell:
- # 如果单元格包含"℃ 湿度",湿度值在下一列
- if "℃ 湿度" in cell or ("℃" in cell and "湿度" in cell):
- if col_idx + 1 < len(check_row):
- humidity_value = check_row[col_idx + 1].strip()
- if humidity_value and humidity_value != "湿度":
- weather.humidity = humidity_value.replace("%RH", "").strip()
- break
- elif col_idx + 1 < len(check_row):
- humidity_value = check_row[col_idx + 1].strip()
- if humidity_value and humidity_value != "湿度":
- weather.humidity = humidity_value.replace("%RH", "").strip()
- break
-
- # 提取风速
- # 注意:新格式中"%RH 风速"可能在同一个单元格中
- for col_idx, cell in enumerate(check_row):
- if "风速" in cell:
- # 如果单元格包含"%RH 风速",风速值在下一列
- if "%RH 风速" in cell or ("%RH" in cell and "风速" in cell):
- if col_idx + 1 < len(check_row):
- wind_speed_value = check_row[col_idx + 1].strip()
- if wind_speed_value and wind_speed_value != "风速":
- weather.windSpeed = wind_speed_value.replace("m/s", "").strip()
- break
- elif col_idx + 1 < len(check_row):
- wind_speed_value = check_row[col_idx + 1].strip()
- if wind_speed_value and wind_speed_value != "风速":
- weather.windSpeed = wind_speed_value.replace("m/s", "").strip()
- break
-
- # 提取风向
- for col_idx, cell in enumerate(check_row):
- if "风向" in cell and col_idx + 1 < len(check_row):
- wind_dir_value = check_row[col_idx + 1].strip()
- # 验证风向值:不应该包含"日期"、"温度"、"湿度"、"风速"等关键词
- if wind_dir_value and wind_dir_value != "风向" and \
- "日期" not in wind_dir_value and "温度" not in wind_dir_value and \
- "湿度" not in wind_dir_value and "风速" not in wind_dir_value and \
- not wind_dir_value.startswith("日期") and len(wind_dir_value) < 50:
- weather.windDirection = wind_dir_value
- break
-
- # 如果天气为空但其他字段有值,默认为"晴"
- if not weather.weather or not weather.weather.strip():
- if any([weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- weather.weather = "晴"
- _mark_auto_weather_default(record, weather)
- logger.debug(f"[噪声检测] 天气字段为空,但其他字段有值,默认为'晴': {weather.monitorAt}")
-
- # 如果至少有一个字段不为空,则添加这条记录
- if any([weather.monitorAt, weather.weather, weather.temp, weather.humidity, weather.windSpeed, weather.windDirection]):
- record.weather.append(weather)
- weather_extracted = True
- logger.info(f"[噪声检测] 从表格解析到天气记录: {weather.to_dict()}")
-
- if weather_extracted:
- break
-
- # 如果表格解析失败,尝试文本解析
- if not weather_extracted:
- text = " ".join(row[1:])
- parse_weather_from_text(text, record)
- if record.weather:
- weather_extracted = True
- break
-
- # 将OCR提取的天气信息与表格解析的天气信息进行合并
- # 按顺序匹配风向:第一条OCR天气信息对应第一条表格天气信息,以此类推
- if ocr_weather_list:
- logger.debug(f"[噪声检测] 开始合并OCR和表格解析的天气信息,OCR提取了 {len(ocr_weather_list)} 条,表格解析了 {len(record.weather)} 条")
-
- # 提取OCR风向数组,按顺序匹配到表格解析的天气记录
- ocr_wind_directions = []
- for ocr_weather in ocr_weather_list:
- if ocr_weather.windDirection and ocr_weather.windDirection.strip():
- ocr_wind_directions.append(ocr_weather.windDirection.strip())
- else:
- ocr_wind_directions.append("") # 保持顺序,即使为空
-
- logger.debug(f"[噪声检测] 从OCR提取的风向数组: {ocr_wind_directions}")
-
- # 按顺序将OCR风向填充到表格解析的天气记录中
- for i, table_weather in enumerate(record.weather):
- if i < len(ocr_wind_directions) and ocr_wind_directions[i]:
- if not table_weather.windDirection or not table_weather.windDirection.strip():
- table_weather.windDirection = ocr_wind_directions[i]
- logger.debug(f"[噪声检测] 按顺序填充第{i}条表格天气记录的风向: {table_weather.windDirection}")
-
- # 原有的合并逻辑(用于补充其他字段,如日期、天气、温度等)
- for ocr_weather in ocr_weather_list:
- # 如果OCR提取的天气信息中monitorAt为空,尝试从表格解析的天气信息中匹配
- matched_in_first_branch = False
- if not ocr_weather.monitorAt or not ocr_weather.monitorAt.strip():
- # 根据温度、湿度、风速等字段匹配表格解析的天气信息
- matched = False
- for table_weather in record.weather:
- # 匹配条件:温度、湿度、风速、风向相同或相似
- # 确保返回布尔值,避免空字符串导致类型错误
- temp_match = bool(ocr_weather.temp and table_weather.temp and
- ocr_weather.temp.strip() == table_weather.temp.strip())
- humidity_match = bool(ocr_weather.humidity and table_weather.humidity and
- ocr_weather.humidity.strip() == table_weather.humidity.strip())
- wind_speed_match = bool(ocr_weather.windSpeed and table_weather.windSpeed and
- ocr_weather.windSpeed.strip() == table_weather.windSpeed.strip())
- wind_dir_match = bool(ocr_weather.windDirection and table_weather.windDirection and
- ocr_weather.windDirection.strip() == table_weather.windDirection.strip())
-
- # 如果至少有两个字段匹配,认为这是同一条天气记录
- # 或者如果表格解析的天气记录只有部分字段(如只有windDirection),也尝试合并
- match_count = sum([temp_match, humidity_match, wind_speed_match, wind_dir_match])
- # 如果表格解析的天气记录只有windDirection,也尝试合并(通过日期匹配)
- has_only_wind_dir = (table_weather.windDirection and not table_weather.weather and
- not table_weather.temp and not table_weather.humidity and
- not table_weather.windSpeed)
-
- if (match_count >= 2 and table_weather.monitorAt) or (has_only_wind_dir and table_weather.monitorAt):
- ocr_weather.monitorAt = table_weather.monitorAt
- logger.debug(f"[噪声检测] 从表格解析结果补充OCR天气信息的日期: {ocr_weather.monitorAt}")
- # 将OCR提取的所有字段补充到表格解析的天气信息中
- if not table_weather.weather and ocr_weather.weather:
- table_weather.weather = ocr_weather.weather
- logger.debug(f"[噪声检测] 从OCR补充表格解析的天气: {table_weather.weather}")
- if not table_weather.temp and ocr_weather.temp:
- table_weather.temp = ocr_weather.temp
- logger.debug(f"[噪声检测] 从OCR补充表格解析的温度: {table_weather.temp}")
- if not table_weather.humidity and ocr_weather.humidity:
- table_weather.humidity = ocr_weather.humidity
- logger.debug(f"[噪声检测] 从OCR补充表格解析的湿度: {table_weather.humidity}")
- if not table_weather.windSpeed and ocr_weather.windSpeed:
- table_weather.windSpeed = ocr_weather.windSpeed
- logger.debug(f"[噪声检测] 从OCR补充表格解析的风速: {table_weather.windSpeed}")
- if not table_weather.windDirection and ocr_weather.windDirection:
- table_weather.windDirection = ocr_weather.windDirection
- logger.debug(f"[噪声检测] 从OCR补充表格解析的风向: {table_weather.windDirection}")
- matched = True
- matched_in_first_branch = True # 标记已在第一个分支匹配成功
- break # 匹配成功后立即退出循环,避免继续匹配
-
- # 如果没有匹配到,但OCR天气信息有其他字段,也添加到记录中(日期为空)
- if not matched:
- logger.debug(f"[噪声检测] OCR天气信息未匹配到表格解析结果,保留原信息(日期为空)")
-
- # 如果OCR天气信息有日期,且未在第一个分支匹配成功,检查是否与表格解析的天气信息重复
- if ocr_weather.monitorAt and ocr_weather.monitorAt.strip() and not matched_in_first_branch:
- # 检查是否已存在相同日期的天气记录
- # 处理日期格式不一致的情况(如 205.7.10 vs 2025.7.10)
- ocr_date = ocr_weather.monitorAt.strip()
- # 如果日期格式是 205.7.10,尝试修正为 2025.7.10
- if re.match(r'^205\.', ocr_date): # 匹配 205 开头的日期(OCR识别错误)
- ocr_date_normalized = re.sub(r'^205\.', '2025.', ocr_date)
- elif re.match(r'^20[0-4]\.', ocr_date): # 匹配其他 200-204 开头的日期
- ocr_date_normalized = re.sub(r'^20[0-4]\.', '2025.', ocr_date)
- else:
- ocr_date_normalized = ocr_date
-
- exists = False
- for table_weather in record.weather:
- table_date = table_weather.monitorAt.strip() if table_weather.monitorAt else ""
- # 处理表格日期格式(可能包含末尾的点号,如 "2025.03.28.")
- table_date_clean = table_date.rstrip('.')
- ocr_date_clean = ocr_date.rstrip('.')
- ocr_date_normalized_clean = ocr_date_normalized.rstrip('.')
-
- # 直接比较或比较归一化后的日期(忽略末尾的点号)
- if table_date_clean and (table_date_clean == ocr_date_clean or table_date_clean == ocr_date_normalized_clean):
- exists = True
- # 如果表格解析的天气信息不完整,用OCR信息补充
- if not table_weather.weather and ocr_weather.weather:
- table_weather.weather = ocr_weather.weather
- logger.debug(f"[噪声检测] 从OCR补充表格解析的天气: {table_weather.weather}")
- if not table_weather.temp and ocr_weather.temp:
- table_weather.temp = ocr_weather.temp
- logger.debug(f"[噪声检测] 从OCR补充表格解析的温度: {table_weather.temp}")
- if not table_weather.humidity and ocr_weather.humidity:
- table_weather.humidity = ocr_weather.humidity
- logger.debug(f"[噪声检测] 从OCR补充表格解析的湿度: {table_weather.humidity}")
- if not table_weather.windSpeed and ocr_weather.windSpeed:
- table_weather.windSpeed = ocr_weather.windSpeed
- logger.debug(f"[噪声检测] 从OCR补充表格解析的风速: {table_weather.windSpeed}")
- if not table_weather.windDirection and ocr_weather.windDirection:
- table_weather.windDirection = ocr_weather.windDirection
- logger.debug(f"[噪声检测] 从OCR补充表格解析的风向: {table_weather.windDirection}")
- logger.debug(f"[噪声检测] OCR天气信息与表格解析结果合并: {table_weather.to_dict()}")
- break # 找到匹配的记录后立即退出
-
- # 如果不存在相同日期的记录,且OCR信息完整,添加到记录中
- if not exists and any([ocr_weather.weather, ocr_weather.temp, ocr_weather.humidity,
- ocr_weather.windSpeed, ocr_weather.windDirection]):
- record.weather.append(ocr_weather)
- logger.debug(f"[噪声检测] 添加OCR天气信息到记录: {ocr_weather.to_dict()}")
- elif not matched_in_first_branch and any([ocr_weather.weather, ocr_weather.temp, ocr_weather.humidity,
- ocr_weather.windSpeed, ocr_weather.windDirection]):
- # 如果OCR天气信息没有日期但有其他字段,且未在第一个分支匹配成功,也添加到记录中
- record.weather.append(ocr_weather)
- logger.debug(f"[噪声检测] 添加OCR天气信息到记录(无日期): {ocr_weather.to_dict()}")
-
- # 最终去重和合并:按日期分组,合并相同日期的记录,补齐空白字段
- if record.weather:
- logger.debug(f"[噪声检测] 合并前天气记录数: {len(record.weather)}")
- deduplicated_weather = {}
- for weather in record.weather:
- date_key = weather.monitorAt.strip() if weather.monitorAt else ""
- if not date_key:
- # 如果没有日期,跳过(不应该出现,但为了安全)
- continue
-
- # 处理日期格式不一致的情况(如 205.7.10 vs 2025.7.10)
- if re.match(r'^205\.', date_key):
- date_key = re.sub(r'^205\.', '2025.', date_key)
- elif re.match(r'^20[0-4]\.', date_key):
- date_key = re.sub(r'^20[0-4]\.', '2025.', date_key)
-
- if date_key not in deduplicated_weather:
- # 创建新的天气记录
- merged_weather = WeatherData()
- merged_weather.monitorAt = date_key
- deduplicated_weather[date_key] = merged_weather
- else:
- merged_weather = deduplicated_weather[date_key]
-
- # 合并字段:如果当前记录的字段有值且合并记录的字段为空,则补齐
- if not merged_weather.weather and weather.weather:
- merged_weather.weather = weather.weather
- if not merged_weather.temp and weather.temp:
- merged_weather.temp = weather.temp
- if not merged_weather.humidity and weather.humidity:
- merged_weather.humidity = weather.humidity
- if not merged_weather.windSpeed and weather.windSpeed:
- merged_weather.windSpeed = weather.windSpeed
- if not merged_weather.windDirection and weather.windDirection:
- merged_weather.windDirection = weather.windDirection
-
- # 更新record.weather为去重后的列表
- record.weather = list(deduplicated_weather.values())
- logger.debug(f"[噪声检测] 合并后天气记录数: {len(record.weather)}")
- for weather in record.weather:
- logger.debug(f"[噪声检测] 最终天气记录: {weather.to_dict()}")
- for table in tables:
- # 首先识别表头,找到各列的索引
- code_idx = -1
- address_idx = -1
- source_idx = -1
- dayMonitorAt_idx = -1
- dayMonitorValue_idx = -1
- dayMonitorBackgroundValue_idx = -1
- nightMonitorAt_idx = -1
- nightMonitorValue_idx = -1
- nightMonitorBackgroundValue_idx = -1
- remark_idx = -1
-
- header_start_row = -1
- # 查找表头行(通常包含"编号"、"测点位置"、"昼间"、"夜间"等关键词)
- for row_idx, row in enumerate(table):
- row_text = " ".join(row).lower()
- # 检查是否是表头行
- if ("编号" in row_text or "测点位置" in row_text or "测点" in row_text) and \
- ("昼间" in row_text or "夜间" in row_text or "测量值" in row_text or "检测时间" in row_text):
- header_start_row = row_idx
- logger.debug(f"[噪声检测] 找到表头行: 第{row_idx}行, 内容: {row}")
-
- # 在第一行表头中查找列索引
- for col_idx, cell in enumerate(row):
- cell_lower = cell.lower().strip()
- if "编号" in cell:
- code_idx = col_idx
- elif "测点位置" in cell or "测点" in cell:
- address_idx = col_idx
- elif "主要声源" in cell or "声源" in cell:
- source_idx = col_idx
- elif "昼间" in cell and ("检测时间" in cell or "时间" in cell):
- dayMonitorAt_idx = col_idx
- elif "昼间" in cell and ("测量值" in cell or "测量" in cell):
- dayMonitorValue_idx = col_idx
- elif "昼间" in cell and ("背景值" in cell or "背景" in cell):
- dayMonitorBackgroundValue_idx = col_idx
- elif "夜间" in cell and ("检测时间" in cell or "时间" in cell):
- nightMonitorAt_idx = col_idx
- elif "夜间" in cell and ("测量值" in cell or "测量" in cell):
- nightMonitorValue_idx = col_idx
- elif "夜间" in cell and ("背景值" in cell or "背景" in cell):
- nightMonitorBackgroundValue_idx = col_idx
- elif "备注" in cell:
- remark_idx = col_idx
-
- # 如果第一行表头没有找到所有列,检查下一行(如果是两行表头)
- if row_idx + 1 < len(table):
- next_row = table[row_idx + 1]
- next_row_text = " ".join(next_row).lower()
- # 如果是第二行表头(通常是详细的列名)
- if "检测时间" in next_row_text or "测量值" in next_row_text or "背景值" in next_row_text:
- logger.debug(f"[噪声检测] 找到第二行表头: 第{row_idx + 1}行, 内容: {next_row}")
- # 检查第一行表头,找到"昼间"和"夜间"的列范围
- day_start_col = -1
- day_end_col = -1
- night_start_col = -1
- night_end_col = -1
-
- for col_idx, cell in enumerate(row):
- cell_lower = cell.lower().strip()
- if "昼间" in cell_lower:
- day_start_col = col_idx
- # 查找昼间结束位置(通常是下一个非空单元格或"夜间"开始)
- for next_col in range(col_idx + 1, len(row)):
- if "夜间" in row[next_col].lower() or row[next_col].strip():
- day_end_col = next_col - 1
- break
- if day_end_col == -1:
- day_end_col = len(row) - 1
- elif "夜间" in cell_lower:
- night_start_col = col_idx
- # 查找夜间结束位置
- for next_col in range(col_idx + 1, len(row)):
- if "备注" in row[next_col].lower() or (next_col == len(row) - 1):
- night_end_col = next_col - 1
- break
- if night_end_col == -1:
- night_end_col = len(row) - 1
-
- # 在第二行表头中查找列索引
- for col_idx, cell in enumerate(next_row):
- cell_lower = cell.lower().strip()
- if "检测时间" in cell or "时间" in cell:
- # 根据列位置判断是昼间还是夜间
- if day_start_col >= 0 and day_start_col <= col_idx <= day_end_col and dayMonitorAt_idx == -1:
- dayMonitorAt_idx = col_idx
- elif night_start_col >= 0 and night_start_col <= col_idx <= night_end_col and nightMonitorAt_idx == -1:
- nightMonitorAt_idx = col_idx
- elif dayMonitorAt_idx == -1:
- dayMonitorAt_idx = col_idx
- elif nightMonitorAt_idx == -1:
- nightMonitorAt_idx = col_idx
- elif "测量值" in cell or "测量" in cell:
- if day_start_col >= 0 and day_start_col <= col_idx <= day_end_col and dayMonitorValue_idx == -1:
- dayMonitorValue_idx = col_idx
- elif night_start_col >= 0 and night_start_col <= col_idx <= night_end_col and nightMonitorValue_idx == -1:
- nightMonitorValue_idx = col_idx
- elif dayMonitorValue_idx == -1:
- dayMonitorValue_idx = col_idx
- elif nightMonitorValue_idx == -1:
- nightMonitorValue_idx = col_idx
- elif "背景值" in cell or "背景" in cell:
- if day_start_col >= 0 and day_start_col <= col_idx <= day_end_col and dayMonitorBackgroundValue_idx == -1:
- dayMonitorBackgroundValue_idx = col_idx
- elif night_start_col >= 0 and night_start_col <= col_idx <= night_end_col and nightMonitorBackgroundValue_idx == -1:
- nightMonitorBackgroundValue_idx = col_idx
- elif dayMonitorBackgroundValue_idx == -1:
- dayMonitorBackgroundValue_idx = col_idx
- elif nightMonitorBackgroundValue_idx == -1:
- nightMonitorBackgroundValue_idx = col_idx
-
- # 如果仍然没有找到某些列,使用默认顺序
- if code_idx == -1:
- code_idx = 0
- if address_idx == -1:
- address_idx = 1
- if source_idx == -1:
- source_idx = 2
- if dayMonitorAt_idx == -1:
- dayMonitorAt_idx = 3
- if dayMonitorValue_idx == -1:
- dayMonitorValue_idx = 4
- if dayMonitorBackgroundValue_idx == -1:
- dayMonitorBackgroundValue_idx = 5
- if nightMonitorAt_idx == -1:
- nightMonitorAt_idx = 6
- if nightMonitorValue_idx == -1:
- nightMonitorValue_idx = 7
- if nightMonitorBackgroundValue_idx == -1:
- nightMonitorBackgroundValue_idx = 8
- if remark_idx == -1:
- remark_idx = 9
-
- logger.info(f"[噪声检测] 列索引映射: 编号={code_idx}, 测点位置={address_idx}, 主要声源={source_idx}, "
- f"昼间检测时间={dayMonitorAt_idx}, 昼间测量值={dayMonitorValue_idx}, 昼间背景值={dayMonitorBackgroundValue_idx}, "
- f"夜间检测时间={nightMonitorAt_idx}, 夜间测量值={nightMonitorValue_idx}, 夜间背景值={nightMonitorBackgroundValue_idx}, "
- f"备注={remark_idx}")
- break
-
- # 如果找到了表头,从表头之后开始解析数据行
- data_start_row = header_start_row + 2 if header_start_row >= 0 and header_start_row + 1 < len(table) and \
- any(k in " ".join(table[header_start_row + 1]).lower() for k in ["检测时间", "测量值", "背景值"]) else \
- (header_start_row + 1 if header_start_row >= 0 else 0)
-
- # 解析数据行
- for row_idx in range(data_start_row, len(table)):
- row = table[row_idx]
- # 跳过空行和表头行
- if not row or len(row) < 3:
- continue
-
- # 检查是否是数据行(第一列应该是编号,通常是N1、N2或M1、M2等格式)
- first_cell = row[0].strip() if len(row) > 0 else ""
- if not first_cell or first_cell in ["编号", "备注"] or not (first_cell[0].upper() in ['N', 'M'] and first_cell[1:].isdigit()):
- # 如果不是标准编号格式,也可能是有编号但格式不同,继续检查
- if not (first_cell and (first_cell[0].isalnum() or first_cell.startswith('N') or first_cell.startswith('M'))):
- continue
-
- logger.debug(f"[噪声检测] 解析数据行 {row_idx}: {row}")
- nd = NoiseData()
-
- # 使用识别的列索引来提取数据
- if code_idx >= 0 and code_idx < len(row):
- nd.code = row[code_idx].strip()
- if address_idx >= 0 and address_idx < len(row):
- raw_address = row[address_idx].strip()
- # 纠正address字段中的常见OCR识别错误
- nd.address = correct_address_ocr_errors(raw_address)
- if source_idx >= 0 and source_idx < len(row):
- nd.source = row[source_idx].strip()
- if dayMonitorAt_idx >= 0 and dayMonitorAt_idx < len(row):
- nd.dayMonitorAt = row[dayMonitorAt_idx].strip()
- if dayMonitorValue_idx >= 0 and dayMonitorValue_idx < len(row):
- nd.dayMonitorValue = row[dayMonitorValue_idx].strip()
- if dayMonitorBackgroundValue_idx >= 0 and dayMonitorBackgroundValue_idx < len(row):
- nd.dayMonitorBackgroundValue = row[dayMonitorBackgroundValue_idx].strip()
- if nightMonitorAt_idx >= 0 and nightMonitorAt_idx < len(row):
- nd.nightMonitorAt = row[nightMonitorAt_idx].strip()
- if nightMonitorValue_idx >= 0 and nightMonitorValue_idx < len(row):
- nd.nightMonitorValue = row[nightMonitorValue_idx].strip()
- if nightMonitorBackgroundValue_idx >= 0 and nightMonitorBackgroundValue_idx < len(row):
- nd.nightMonitorBackgroundValue = row[nightMonitorBackgroundValue_idx].strip()
- if remark_idx >= 0 and remark_idx < len(row):
- nd.remark = row[remark_idx].strip()
-
- # 验证数据有效性(至少应该有编号和测点位置)
- if nd.code and nd.address:
- logger.info(f"[噪声检测] 解析到数据: {nd.to_dict()}")
- record.noise.append(nd)
- else:
- logger.warning(f"[噪声检测] 跳过无效数据行: {row}")
-
- # 矫正编号:按照数据顺序重新分配编号为 N1, N2, N3...
- for idx, nd in enumerate(record.noise, start=1):
- original_code = nd.code
- nd.code = f"N{idx}"
- if original_code != nd.code:
- logger.info(f"[噪声检测] 编号矫正: {original_code} -> {nd.code}")
-
- # 解析工况信息
- # 优先使用opStatus格式解析(附件 工况及工程信息),如果失败则使用旧格式
- if "附件" in markdown_content and "工况" in markdown_content:
- operational_conditions = parse_operational_conditions_opstatus(markdown_content)
- if operational_conditions:
- logger.info(f"[噪声检测] 使用opStatus格式解析到 {len(operational_conditions)} 条工况信息")
- record.operationalConditions = operational_conditions
- else:
- # 如果opStatus格式解析失败,尝试旧格式
- operational_conditions = parse_operational_conditions(markdown_content)
- record.operationalConditions = operational_conditions
- else:
- operational_conditions = parse_operational_conditions(markdown_content)
- record.operationalConditions = operational_conditions
-
- # v2版本不依赖OCR,只从markdown内容解析
- # 如果某些字段为空,会在日志中记录警告,但不进行OCR补充识别
-
- return record
|