| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256 |
- from pathlib import Path
- from typing import List, Tuple, Dict, Any, Optional, Literal
- import re
- import json
- import logging
- import sys
- import pandas as pd
- # 尝试使用统一的日志系统
- try:
- from ..utils.logging_config import get_logger
- logger = get_logger("pdf_converter_v2.utils.table_extractor")
- except (ImportError, ValueError):
- # 如果无法导入,使用标准 logging
- logger = logging.getLogger(__name__)
- # 确保日志系统正确配置
- if not logger.handlers:
- handler = logging.StreamHandler(sys.stderr)
- handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)-8s | %(name)s:%(funcName)s:%(lineno)d - %(message)s'))
- logger.addHandler(handler)
- logger.setLevel(logging.INFO)
- try:
- import pdfplumber
- _PDFPLUMBER_AVAILABLE = True
- except ImportError as e:
- _PDFPLUMBER_AVAILABLE = False
- _PDFPLUMBER_IMPORT_ERROR = str(e)
- except Exception as e:
- # 捕获其他可能的导入错误
- _PDFPLUMBER_AVAILABLE = False
- _PDFPLUMBER_IMPORT_ERROR = str(e)
- else:
- _PDFPLUMBER_IMPORT_ERROR = None
- # 文档类型 -> 表头规则
- TABLE_TYPE_RULES: Dict[str, List[dict]] = {
- # 结算报告类
- "settlementReport": [
- {
- "name": "审定结算汇总表",
- "keywords": ["序号", "审计内容", "送审金额(含税)", "审定金额(含税)", "审定金额(不含税)", "增减金额", "备注"],
- "match_mode": "all",
- },
- {
- "name": "合同执行情况",
- "keywords": ["施工单位", "中标通知书金额", "中标通知书编号", "合同金额", "结算送审金额", "差额"],
- "match_mode": "all",
- },
- {
- "name": "赔偿合同",
- "keywords": ["合同对方", "赔偿事项", "合同金额", "结算送审金额", "差额"],
- "match_mode": "all",
- },
- {
- "name": "物资采购合同1",
- "keywords": ["物料名称", "合同数量", "施工图数量", "单价(不含税)", "差额"],
- "match_mode": "all",
- },
- {
- "name": "物资采购合同2",
- "keywords": ["物料名称", "合同金额(不含税)", "入账金额", "差额", "备注"],
- "match_mode": "all",
- },
- {
- "name": "其他服务类合同",
- "keywords": ["服务商", "中标通知书", "合同金额", "送审金额", "结算金额"],
- "match_mode": "all",
- },
- ],
- # 初设评审类
- "designReview": [
- {
- "name": "初设评审的概算投资",
- "keywords": ["序号", "工程名称", "建设规模", "静态投资", "其中:建设场地征用及清理费", "动态投资"],
- "match_mode": "all",
- },
- {
- "name": "初设评审的概算投资明细",
- "keywords": ["序号", "工程或费用名称", "建筑工程费", "设备购置费", "安装工程费", "其他费用", "合计"],
- "match_mode": "all",
- },
- {
- "name": "初设评审的概算投资费用",
- "keywords": ["序号", "工程或费用名称", "费用金额", "各项占静态投资", "单位投资"],
- "match_mode": "all",
- },
- ],
- # 决算报告类
- "finalAccount": [
- {
- "name": "单项工程投资完成情况",
- "keywords": ["费用项目", "概算金额", "决算金额", "审定金额", "增值税额", "超", "节"],
- "match_mode": "all",
- },
- ],
- }
- EXCLUDE_RULES: List[str] = []
- # 是否启用跨页合并
- ENABLE_MERGE_CROSS_PAGE_TABLES: bool = True
- def extract_table_title_from_page(page, table_bbox: tuple, max_lines: int = 10) -> str:
- """
- 从 PDF 页面中提取表格上方的文本作为表格标题。
- 优先查找包含工程名称的行(如 "XXX变电站新建工程总概算表")。
-
- Args:
- page: pdfplumber page 对象
- table_bbox: 表格的边界框 (x0, top, x1, bottom)
- max_lines: 最多向上搜索的行数
-
- Returns:
- str: 表格标题,如果未找到则返回空字符串
- """
- if not table_bbox:
- return ""
-
- table_top = table_bbox[1] # 表格顶部 y 坐标
- page_width = page.width
-
- # 在表格上方区域搜索文本(向上搜索到页面顶部)
- search_top = 0 # 从页面顶部开始搜索
- search_bottom = table_top - 5 # 留一点边距
-
- if search_bottom <= search_top:
- return ""
-
- # 提取表格上方区域的文本
- crop_box = (0, search_top, page_width, search_bottom)
- try:
- cropped = page.within_bbox(crop_box)
- text = cropped.extract_text() or ""
- except Exception:
- return ""
-
- if not text.strip():
- return ""
-
- # 取所有行
- lines = [line.strip() for line in text.strip().split('\n') if line.strip()]
- if not lines:
- return ""
-
- # 排除的关键词(这些不是工程名称)
- exclude_keywords = ["工程规模", "金额单位", "建设规模", "附表", "附件"]
-
- def is_valid_title(line: str) -> bool:
- """检查是否是有效的工程名称标题"""
- # 排除包含排除关键词的行
- if any(kw in line for kw in exclude_keywords):
- return False
- # 必须包含工程相关关键词
- if not any(kw in line for kw in ["工程", "概算表", "估算表"]):
- return False
- return True
-
- # 优先查找包含 "总概算表" 的行(最可能是工程名称)
- for line in reversed(lines):
- if "总概算表" in line and is_valid_title(line):
- # 进一步检查是否包含具体工程类型
- if any(kw in line for kw in ["变电站", "线路", "间隔", "kV", "KV", "千伏"]):
- return line.strip()
-
- # 次优先:查找包含 "概算表" 但不是 "总概算表" 的行
- for line in reversed(lines):
- if "概算表" in line and "总概算表" not in line and is_valid_title(line):
- if any(kw in line for kw in ["变电站", "线路", "间隔", "kV", "KV", "千伏"]):
- return line.strip()
-
- # 再次优先:查找包含 "变电站"/"线路"/"间隔" + "工程" 的行
- for line in reversed(lines):
- if is_valid_title(line):
- if any(kw in line for kw in ["变电站", "线路", "间隔"]) and "工程" in line:
- return line.strip()
-
- # 查找包含电压等级 + "工程" 的行
- for line in reversed(lines):
- if is_valid_title(line):
- if any(kw in line for kw in ["kV", "KV", "千伏", "kv"]) and "工程" in line:
- return line.strip()
-
- # 最后:返回任何包含工程关键词的有效行
- for line in reversed(lines):
- if is_valid_title(line):
- return line.strip()
-
- return ""
- def extract_tables_with_pdfplumber(
- pdf_path: str,
- pages: str = "all",
- extract_titles: bool = False,
- ) -> List[Tuple[int, pd.DataFrame, tuple, str]]:
- """
- 使用 pdfplumber 提取 PDF 中的表格。
- Args:
- pdf_path: PDF 文件路径
- pages: 页码范围,如 "all" 或 "1-5,7,9-10"
- extract_titles: 是否提取表格标题
- Returns:
- List[Tuple[int, pd.DataFrame, tuple, str]]: [(页码, DataFrame, bbox, title), ...]
- 如果 extract_titles=False,title 为空字符串
- """
- # 运行时再次尝试导入,因为模块加载时可能失败但运行时环境可能不同
- global _PDFPLUMBER_AVAILABLE, pdfplumber, _PDFPLUMBER_IMPORT_ERROR
- if not _PDFPLUMBER_AVAILABLE:
- try:
- import pdfplumber
- _PDFPLUMBER_AVAILABLE = True
- _PDFPLUMBER_IMPORT_ERROR = None
- except ImportError as e:
- error_msg = f"pdfplumber 库未安装,无法提取表格(请安装 pdfplumber)"
- if _PDFPLUMBER_IMPORT_ERROR:
- error_msg += f"\n模块加载时导入错误: {_PDFPLUMBER_IMPORT_ERROR}"
- error_msg += f"\n运行时导入错误: {e}"
- raise RuntimeError(error_msg)
- except Exception as e:
- error_msg = f"pdfplumber 库导入失败: {e}"
- if _PDFPLUMBER_IMPORT_ERROR:
- error_msg += f"\n模块加载时导入错误: {_PDFPLUMBER_IMPORT_ERROR}"
- raise RuntimeError(error_msg)
- tables_data: List[Tuple[int, pd.DataFrame, tuple, str]] = []
- with pdfplumber.open(pdf_path) as pdf:
- total_pages = len(pdf.pages)
- logger.info(f"[pdfplumber] 打开 PDF,共 {total_pages} 页")
-
- # 确定要处理的页面
- if pages == "all":
- pages_to_process = pdf.pages
- logger.info(f"[pdfplumber] 处理所有页面: 1-{total_pages}")
- else:
- # 解析页面范围(如 "1-5,7,9-10")
- page_numbers: List[int] = []
- for part in pages.split(","):
- part = part.strip()
- if not part:
- continue
- if "-" in part:
- start, end = map(int, part.split("-"))
- page_numbers.extend(range(start, end + 1))
- else:
- page_numbers.append(int(part))
- pages_to_process = [
- pdf.pages[i - 1] for i in page_numbers if 0 < i <= len(pdf.pages)
- ]
- logger.info(f"[pdfplumber] 处理指定页面: {page_numbers}")
- # 提取每一页的表格
- for page in pages_to_process:
- page_num = page.page_number
- table_settings = {
- "vertical_strategy": "lines",
- "horizontal_strategy": "lines",
- "intersection_tolerance": 3,
- "min_words_vertical": 1,
- "min_words_horizontal": 1,
- }
- tables = page.find_tables(table_settings=table_settings)
- page_table_count = 0
- for table in tables:
- table_data = table.extract()
- if table_data and len(table_data) > 0:
- df = pd.DataFrame(table_data)
- bbox = table.bbox # (x0, top, x1, bottom)
- # 提取表格标题
- title = ""
- if extract_titles:
- title = extract_table_title_from_page(page, bbox)
- if title:
- logger.debug(f"[pdfplumber] 页面 {page_num}: 表格标题: {title}")
- tables_data.append((page_num, df, bbox, title))
- page_table_count += 1
-
- if page_table_count > 0:
- logger.debug(f"[pdfplumber] 页面 {page_num}: 提取到 {page_table_count} 个表格")
- logger.info(f"[pdfplumber] 提取完成: 共提取到 {len(tables_data)} 个表格")
- return tables_data
- def check_table_header(table_df: pd.DataFrame, rule: dict) -> Tuple[bool, str]:
- """
- 检查表格是否匹配指定的表头规则(固定匹配:必须包含所有关键词)
- 处理表头换行(多行表头)并对关键字做去空格匹配。
- """
- if table_df.empty:
- return False, ""
- rule_name = rule.get("name", "未知规则")
- if rule_name in EXCLUDE_RULES:
- return False, ""
- # 默认从第一行开始
- start_row = 0
- # 合并前几行作为完整表头(通常表头可能占 1-3 行)
- header_rows_to_check = min(3, len(table_df) - start_row)
- header_text_parts: List[str] = []
- for row_idx in range(start_row, start_row + header_rows_to_check):
- if row_idx >= len(table_df):
- break
- row = table_df.iloc[row_idx].astype(str).str.strip()
- for cell in row:
- cell_text = str(cell).strip()
- if cell_text and cell_text.lower() not in ["nan", "none", ""]:
- # 将单元格内的换行符替换为空格(处理 xlsx 中的换行)
- cell_text = cell_text.replace("\n", " ").replace("\r", " ")
- header_text_parts.append(cell_text)
- # 合并所有表头文本
- header_text = " ".join(header_text_parts)
- # 规范空白:多个空格合并
- header_text = re.sub(r"\s+", " ", header_text).strip()
- # 去掉所有空白,用于处理“中标通知\n书金额”这类情况
- header_text_no_space = re.sub(r"\s+", "", header_text)
- keywords = rule.get("keywords", [])
- match_mode = rule.get("match_mode", "all")
- if not keywords:
- return False, ""
- if match_mode == "all":
- all_match = True
- for keyword in keywords:
- keyword_no_space = re.sub(r"\s+", "", keyword)
- if keyword in header_text or keyword_no_space in header_text_no_space:
- continue
- all_match = False
- break
- if all_match:
- return True, rule_name
- elif match_mode == "any":
- for keyword in keywords:
- keyword_no_space = re.sub(r"\s+", "", keyword)
- if keyword in header_text or keyword_no_space in header_text_no_space:
- return True, rule_name
- return False, ""
- def is_likely_header_only(table_df: pd.DataFrame, min_data_rows: int = 1) -> bool:
- """
- 判断表格是否可能只包含表头(数据行很少)。
- """
- if table_df.empty:
- return True
- header_rows = min(3, len(table_df))
- return len(table_df) <= header_rows + min_data_rows
- def has_similar_structure(table1_df: pd.DataFrame, table2_df: pd.DataFrame, tolerance: int = 1) -> bool:
- """
- 判断两个表格是否有相似的结构(列数接近)。
- """
- if table1_df.empty or table2_df.empty:
- return False
- cols1 = len(table1_df.columns)
- cols2 = len(table2_df.columns)
- return abs(cols1 - cols2) <= tolerance
- def _merge_all_tables(
- tables: List[Tuple[int, pd.DataFrame, int]],
- ) -> List[Tuple[int, pd.DataFrame, int]]:
- """
- 合并所有跨页表格(不进行过滤):
- - 只处理"表头在当前页,内容在下一页"的典型情况;
- - 严格限制只合并相邻页,且列结构相似。
- """
- if not ENABLE_MERGE_CROSS_PAGE_TABLES or not tables:
- return tables
- # page -> [(idx, df)]
- page_map: Dict[int, List[Tuple[int, pd.DataFrame]]] = {}
- for orig_idx, df, page in tables:
- page_map.setdefault(page, []).append((orig_idx, df))
- merged: List[Tuple[int, pd.DataFrame, int]] = []
- processed: set[int] = set()
- sorted_pages = sorted(page_map.keys())
- for page in sorted_pages:
- current_list = page_map[page]
- for orig_idx, df in current_list:
- if orig_idx in processed:
- continue
- current_df = df
- did_merge = False
- # 情况:当前表格只有表头,尝试合并下一页
- if is_likely_header_only(current_df):
- next_page = page + 1
- if next_page in page_map:
- for next_orig_idx, next_df in page_map[next_page]:
- if next_orig_idx in processed:
- continue
- if not has_similar_structure(current_df, next_df):
- continue
- # 合并:保留当前页表头,拼接下一页数据
- header_rows = min(3, len(current_df))
- header_df = current_df.iloc[:header_rows].copy()
- next_data_df = next_df.copy()
- # 对齐列数
- if len(header_df.columns) != len(next_data_df.columns):
- if len(next_data_df.columns) < len(header_df.columns):
- for _ in range(len(next_data_df.columns), len(header_df.columns)):
- next_data_df[len(next_data_df.columns)] = ""
- merged_df = pd.concat([header_df, next_data_df], ignore_index=True)
- merged.append((orig_idx, merged_df, page))
- processed.add(orig_idx)
- processed.add(next_orig_idx)
- did_merge = True
- break
- if not did_merge and orig_idx not in processed:
- merged.append((orig_idx, current_df, page))
- processed.add(orig_idx)
- return merged
- def _merge_cross_page_tables(
- tables: List[Tuple[int, pd.DataFrame, str, int, str]],
- header_rules: List[dict],
- ) -> List[Tuple[int, pd.DataFrame, str, int, str]]:
- """
- 简化版跨页合并逻辑(用于已匹配规则的表格):
- - 只处理"表头在当前页,内容在下一页"的典型情况;
- - 严格限制只合并相邻页,且列结构相似;
- - 如果下一页第一行看起来像新的表头,则不合并。
-
- 元组格式: (orig_idx, df, rule_name, page, title)
- """
- if not ENABLE_MERGE_CROSS_PAGE_TABLES or not tables:
- return tables
- # page -> [(idx, df, rule_name, title)]
- page_map: Dict[int, List[Tuple[int, pd.DataFrame, str, str]]] = {}
- for orig_idx, df, rule_name, page, title in tables:
- page_map.setdefault(page, []).append((orig_idx, df, rule_name, title))
- merged: List[Tuple[int, pd.DataFrame, str, int, str]] = []
- processed: set[int] = set()
- sorted_pages = sorted(page_map.keys())
- for page in sorted_pages:
- current_list = page_map[page]
- for orig_idx, df, rule_name, title in current_list:
- if orig_idx in processed:
- continue
- current_df = df
- did_merge = False
- # 情况:当前表格只有表头,尝试合并下一页
- if is_likely_header_only(current_df):
- next_page = page + 1
- if next_page in page_map:
- for next_orig_idx, next_df, next_rule_name, next_title in page_map[next_page]:
- if next_orig_idx in processed:
- continue
- if rule_name and next_rule_name and rule_name != next_rule_name:
- continue
- if not has_similar_structure(current_df, next_df):
- continue
- # 判断下一页第一行是否像表头
- next_first_row_text = ""
- if not next_df.empty:
- next_first_row_text = " ".join(
- next_df.iloc[0].astype(str).str.strip().tolist()
- )
- keyword_count = 0
- if header_rules and rule_name:
- for rule in header_rules:
- if rule.get("name") == rule_name:
- kws = rule.get("keywords", [])
- keyword_count = sum(
- 1 for kw in kws if kw in next_first_row_text
- )
- break
- # 如果下一页第一行包含较多关键词,认为是新表头,不合并
- if keyword_count >= 2:
- continue
- # 合并:保留当前页表头,拼接下一页数据
- header_rows = min(3, len(current_df))
- header_df = current_df.iloc[:header_rows].copy()
- next_data_df = next_df.copy()
- # 对齐列数
- if len(header_df.columns) != len(next_data_df.columns):
- if len(next_data_df.columns) < len(header_df.columns):
- for _ in range(len(next_data_df.columns), len(header_df.columns)):
- next_data_df[len(next_data_df.columns)] = ""
- merged_df = pd.concat([header_df, next_data_df], ignore_index=True)
- merged.append((orig_idx, merged_df, rule_name, page, title))
- processed.add(orig_idx)
- processed.add(next_orig_idx)
- did_merge = True
- break
- if not did_merge and orig_idx not in processed:
- merged.append((orig_idx, current_df, rule_name, page, title))
- processed.add(orig_idx)
- return merged
- def _fix_broken_cells(table_df: pd.DataFrame, header_row_count: int = 1) -> pd.DataFrame:
- """
- 修复被错误分割的单元格(一个单元格的内容被识别成多行)
-
- 检测规则:
- 1. 如果某一行的前N列有内容,但后面的列大部分为空(超过50%)
- 2. 且上一行对应列有内容,则认为当前行是上一行的延续,需要合并
-
- Args:
- table_df: 表格DataFrame
- header_row_count: 表头行数,跳过表头不处理
-
- Returns:
- pd.DataFrame: 修复后的表格
- """
- if table_df.empty or len(table_df) <= header_row_count:
- return table_df
-
- df = table_df.copy()
- rows_to_remove = []
-
- # 从表头后开始检查
- for i in range(header_row_count, len(df)):
- if i <= header_row_count:
- continue # 跳过第一行数据
-
- # 获取当前行
- current_row = df.iloc[i]
- prev_row = df.iloc[i-1]
-
- # 统计有内容的列和空列
- non_empty_cols = []
- empty_cols = []
-
- for j in range(len(df.columns)):
- val = current_row.iloc[j]
- val_str = str(val).strip()
- is_empty = (val is None or pd.isna(val) or not val_str or
- val_str.lower() in ['nan', 'none', '', '0', '0.0'])
-
- if is_empty:
- empty_cols.append(j)
- else:
- non_empty_cols.append(j)
-
- # 如果没有非空列,跳过
- if not non_empty_cols:
- continue
-
- # 计算空列比例
- empty_ratio = len(empty_cols) / len(df.columns) if len(df.columns) > 0 else 0
-
- # 如果空列超过50%,可能是被截断的行
- if empty_ratio > 0.5:
- # 检查上一行对应位置是否有内容
- can_merge = True
- merge_cols = []
-
- for col_idx in non_empty_cols:
- prev_val = prev_row.iloc[col_idx]
- prev_val_str = str(prev_val).strip()
- curr_val = current_row.iloc[col_idx]
- curr_val_str = str(curr_val).strip()
-
- # 如果上一行对应列有内容,可以合并
- if prev_val_str and prev_val_str.lower() not in ['nan', 'none', '', '0', '0.0']:
- merge_cols.append(col_idx)
- else:
- # 如果上一行对应列为空,但当前行有内容,可能是新行,不合并
- # 但如果只有前几列有内容,且都是文本(不是数字),可能是延续
- if col_idx < len(df.columns) * 0.5: # 前50%的列
- # 检查是否是文本(不是纯数字)
- if not curr_val_str.replace('.', '').replace('-', '').isdigit():
- merge_cols.append(col_idx)
- else:
- can_merge = False
- break
- else:
- can_merge = False
- break
-
- if can_merge and merge_cols:
- # 合并每个非空列到上一行对应的列
- for col_idx in merge_cols:
- prev_val = str(prev_row.iloc[col_idx]).strip()
- curr_val = str(current_row.iloc[col_idx]).strip()
- # 合并(移除换行符)
- merged_val = prev_val + curr_val.replace('\n', '').replace('\r', '')
- df.iloc[i-1, col_idx] = merged_val
-
- rows_to_remove.append(i)
- logger.debug(f"[跨行合并] 合并行 {i} 到行 {i-1},列: {merge_cols}")
-
- # 删除已合并的行
- if rows_to_remove:
- df = df.drop(rows_to_remove).reset_index(drop=True)
- logger.info(f"[跨行合并] 修复了 {len(rows_to_remove)} 个被错误分割的单元格(跨行)")
-
- return df
- def _format_header_text(cell_val: str) -> str:
- """
- 格式化表头文本:移除换行符和多余空格,用于匹配。
-
- Args:
- cell_val: 原始单元格值
-
- Returns:
- 格式化后的文本(移除所有空格,用于精确匹配)
- """
- if not cell_val or str(cell_val).lower() in ['nan', 'none', '']:
- return ""
- # 清理换行符,直接移除
- text = str(cell_val).strip().replace('\n', '').replace('\r', '')
- # 移除所有空格,用于匹配
- text = re.sub(r'\s+', '', text)
- return text
- def _detect_header_rows(df: pd.DataFrame, header_row_idx: int, header_keywords: List[str]) -> int:
- """
- 智能检测表头行数,避免把数据行当作表头。
-
- Args:
- df: 数据框
- header_row_idx: 已识别的表头行索引
- header_keywords: 表头关键词列表
-
- Returns:
- 表头行数(从 header_row_idx 开始)
- """
- header_rows_to_check = 1
-
- # 检查后续行是否是表头的延续
- for i in range(header_row_idx + 1, min(header_row_idx + 3, len(df))):
- row = df.iloc[i]
- row_text = " ".join(row.astype(str).str.strip().tolist())
-
- # 检查是否包含表头关键词
- keyword_count = sum(1 for kw in header_keywords if kw in row_text)
-
- # 如果包含关键词,进一步判断
- if keyword_count >= 2: # 至少包含2个关键词才可能是表头
- # 检查是否主要是数字(如果是,则不是表头)
- numeric_count = sum(1 for cell in row if str(cell).strip().replace('.', '').replace('-', '').isdigit())
- numeric_ratio = numeric_count / len(row) if len(row) > 0 else 0
-
- # 检查是否包含明显的公司名称、人名等(如果是,则不是表头)
- has_company_name = any(
- '公司' in str(cell) or '有限' in str(cell) or '股份' in str(cell) or
- '工程' in str(cell) or '集团' in str(cell) or '局' in str(cell)
- for cell in row
- )
-
- # 如果数字占比小于30%且不包含公司名称,可能是表头
- if numeric_ratio < 0.3 and not has_company_name:
- header_rows_to_check += 1
- else:
- # 包含公司名称或数字占比高,是数据行,停止
- break
- else:
- # 关键词太少,不是表头,停止
- break
-
- return header_rows_to_check
- def parse_settlement_summary_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"审定结算汇总表",提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "name": str, # 项目名称(审计内容)
- "settledVerifiedTaxExclusiveInvestment": float, # 结算审定不含税投资(元,两位小数)
- "settledVerifiedTaxInclusiveInvestment": float, # 结算审定含税投资(元,两位小数)
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "审计内容", "审定金额(含税)", "审定金额(不含税)"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- header_keywords = ["序号", "审计内容", "送审金额", "审定金额", "增减金额", "备注"]
- header_rows_to_check = _detect_header_rows(df, header_row_idx, header_keywords)
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[审定结算汇总表] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[审定结算汇总表] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[审定结算汇总表] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_name = None # 审计内容列(项目名称)
- col_tax_exclusive = None # 审定金额(不含税)列
- col_tax_inclusive = None # 审定金额(含税)列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "审计内容" in header_text or "项目名称" in header_text or "name" in cell_lower:
- col_name = idx
- elif "审定金额(不含税)" in header_text or ("不含税" in header_text and "审定" in header_text):
- col_tax_exclusive = idx
- elif "审定金额(含税)" in header_text or ("审定金额" in header_text and "含税" in header_text):
- col_tax_inclusive = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_name is None:
- col_name = 1
-
- # 如果金额列未找到,尝试从后往前找(通常金额列在表格右侧)
- if col_tax_exclusive is None or col_tax_inclusive is None:
- for idx in range(len(header_texts) - 1, -1, -1):
- header_text = header_texts[idx]
- if "不含税" in header_text and col_tax_exclusive is None:
- col_tax_exclusive = idx
- elif "含税" in header_text and "审定" in header_text and col_tax_inclusive is None:
- col_tax_inclusive = idx
- if col_tax_exclusive is not None and col_tax_inclusive is not None:
- break
-
- logger.info(f"[审定结算汇总表] 列识别: 序号={col_no}, 项目名称={col_name}, 不含税={col_tax_exclusive}, 含税={col_tax_inclusive}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- name_val = row.iloc[col_name] if col_name is not None and col_name < len(row) else None
- tax_exclusive_val = row.iloc[col_tax_exclusive] if col_tax_exclusive is not None and col_tax_exclusive < len(row) else None
- tax_inclusive_val = row.iloc[col_tax_inclusive] if col_tax_inclusive is not None and col_tax_inclusive < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析项目名称(审计内容),清理换行符
- name = str(name_val).strip() if name_val is not None and not pd.isna(name_val) else ""
- # 清理换行符,替换为空格
- name = name.replace('\n', '').replace('\r', '') # 直接移除换行符,不替换为空格
- # 清理多余空格
- name = re.sub(r'\s+', ' ', name).strip()
-
- # 跳过空行
- if not name or name == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in name for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析投资金额
- settled_verified_tax_exclusive = parse_number(tax_exclusive_val)
- settled_verified_tax_inclusive = parse_number(tax_inclusive_val)
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "name": name,
- "settledVerifiedTaxExclusiveInvestment": settled_verified_tax_exclusive,
- "settledVerifiedTaxInclusiveInvestment": settled_verified_tax_inclusive,
- })
-
- return result
- def parse_contract_execution_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"合同执行情况"表,提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "constructionUnit": str, # 施工单位
- "bidNoticeAmount": float, # 中标通知书金额(元,两位小数)
- "bidNoticeNo": str, # 中标通知书编号
- "contractAmount": float, # 合同金额(元,两位小数)
- "settlementSubmittedAmount": float, # 结算送审金额(元,两位小数)
- "differenceAmount": float, # 差额(元,两位小数)
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "施工单位", "中标通知书金额", "中标通知书编号", "合同金额", "结算送审金额", "差额"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- # 智能判断表头行数:表头行通常不包含纯数字(除了序号列)
- header_rows_to_check = 1
- for i in range(header_row_idx + 1, min(header_row_idx + 3, len(df))):
- row = df.iloc[i]
- # 检查这一行是否像表头(包含关键词且不全是数字)
- row_text = " ".join(row.astype(str).str.strip().tolist())
- has_keywords = any(kw in row_text for kw in ["施工单位", "中标", "合同", "结算", "送审", "差额"])
- # 如果包含关键词且看起来不像数据行(不全是数字),则可能是表头的一部分
- if has_keywords:
- # 检查是否主要是数字(如果是,则不是表头)
- numeric_count = sum(1 for cell in row if str(cell).strip().replace('.', '').replace('-', '').isdigit())
- if numeric_count < len(row) * 0.5: # 如果数字占比小于50%,可能是表头
- header_rows_to_check += 1
- else:
- break
- else:
- break
-
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[合同执行情况] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[合同执行情况] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[合同执行情况] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_construction_unit = None # 施工单位列
- col_bid_notice_amount = None # 中标通知书金额列
- col_bid_notice_no = None # 中标通知书编号列
- col_contract_amount = None # 合同金额列
- col_settlement_submitted = None # 结算送审金额列
- col_difference = None # 差额列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- # header_text 已经是格式化后的(无空格),直接使用
-
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "施工单位" in header_text:
- col_construction_unit = idx
- elif "中标" in header_text and "金额" in header_text:
- col_bid_notice_amount = idx
- elif "中标" in header_text and "编号" in header_text:
- col_bid_notice_no = idx
- elif ("合同金额" in header_text or
- ("合同" in header_text and "金额" in header_text and
- "结算" not in header_text and "送审" not in header_text)):
- col_contract_amount = idx
- elif ("结算送审金额" in header_text or
- ("送审" in header_text and "金额" in header_text) or
- ("结算" in header_text and "送审" in header_text)):
- col_settlement_submitted = idx
- elif "差额" in header_text:
- col_difference = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_construction_unit is None:
- col_construction_unit = 1
-
- # 如果金额列未找到,尝试从后往前找(通常金额列在表格右侧)
- # 同时检查列名中是否包含关键词的部分匹配
- if col_bid_notice_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "中标" in header_text and "金额" in header_text:
- col_bid_notice_amount = idx
- break
-
- if col_bid_notice_no is None:
- for idx, header_text in enumerate(header_texts):
- if "中标" in header_text and "编号" in header_text:
- col_bid_notice_no = idx
- break
-
- if col_contract_amount is None:
- # 尝试更灵活的匹配:合同金额可能在"合同"和"金额"分开的列中
- for idx, header_text in enumerate(header_texts):
- # header_text 已经是格式化后的(无空格),直接使用
- # 检查是否包含"合同"和"金额",且不包含"结算"、"送审"等
- if ("合同" in header_text and "金额" in header_text and
- "结算" not in header_text and "送审" not in header_text):
- col_contract_amount = idx
- break
-
- if col_settlement_submitted is None:
- # 尝试更灵活的匹配:结算送审金额可能在"结算"、"送审"、"金额"分开的列中
- for idx, header_text in enumerate(header_texts):
- # header_text 已经是格式化后的(无空格),直接使用
- # 检查是否包含"送审"和"金额",或者"结算"和"送审"
- if (("送审" in header_text and "金额" in header_text) or
- ("结算" in header_text and "送审" in header_text) or
- "结算送审金额" in header_text):
- col_settlement_submitted = idx
- break
-
- if col_difference is None:
- for idx, header_text in enumerate(header_texts):
- if "差额" in header_text:
- col_difference = idx
- break
-
- logger.info(f"[合同执行情况] 列识别: 序号={col_no}, 施工单位={col_construction_unit}, "
- f"中标金额={col_bid_notice_amount}, 中标编号={col_bid_notice_no}, "
- f"合同金额={col_contract_amount}, 送审金额={col_settlement_submitted}, 差额={col_difference}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- construction_unit_val = row.iloc[col_construction_unit] if col_construction_unit is not None and col_construction_unit < len(row) else None
- bid_notice_amount_val = row.iloc[col_bid_notice_amount] if col_bid_notice_amount is not None and col_bid_notice_amount < len(row) else None
- bid_notice_no_val = row.iloc[col_bid_notice_no] if col_bid_notice_no is not None and col_bid_notice_no < len(row) else None
- contract_amount_val = row.iloc[col_contract_amount] if col_contract_amount is not None and col_contract_amount < len(row) else None
- settlement_submitted_val = row.iloc[col_settlement_submitted] if col_settlement_submitted is not None and col_settlement_submitted < len(row) else None
- difference_val = row.iloc[col_difference] if col_difference is not None and col_difference < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析施工单位,清理换行符
- construction_unit = str(construction_unit_val).strip() if construction_unit_val is not None and not pd.isna(construction_unit_val) else ""
- # 清理换行符,直接移除
- construction_unit = construction_unit.replace('\n', '').replace('\r', '')
- # 清理多余空格
- construction_unit = re.sub(r'\s+', ' ', construction_unit).strip()
-
- # 跳过空行
- if not construction_unit or construction_unit == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in construction_unit for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析中标通知书编号,清理换行符
- bid_notice_no = str(bid_notice_no_val).strip() if bid_notice_no_val is not None and not pd.isna(bid_notice_no_val) else ""
- bid_notice_no = bid_notice_no.replace('\n', '').replace('\r', '')
- bid_notice_no = re.sub(r'\s+', ' ', bid_notice_no).strip()
-
- # 解析金额(保留两位小数)
- bid_notice_amount = parse_number(bid_notice_amount_val)
- contract_amount = parse_number(contract_amount_val)
- settlement_submitted_amount = parse_number(settlement_submitted_val)
- difference_amount = parse_number(difference_val)
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "constructionUnit": construction_unit,
- "bidNoticeAmount": bid_notice_amount,
- "bidNoticeNo": bid_notice_no,
- "contractAmount": contract_amount,
- "settlementSubmittedAmount": settlement_submitted_amount,
- "differenceAmount": difference_amount,
- })
-
- return result
- def parse_compensation_contract_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"赔偿合同"表,提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "contractCounterpartyName": str, # 合同对方名称
- "compensationItem": str, # 赔偿事项
- "contractAmount": float, # 合同金额(元,两位小数)
- "settlementSubmittedAmount": float, # 结算送审金额(元,两位小数)
- "differenceAmount": float, # 差额(元,两位小数)
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "合同对方", "赔偿事项", "合同金额", "结算送审金额", "差额"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- header_keywords = ["序号", "合同对方", "赔偿事项", "合同金额", "结算送审金额", "差额"]
- header_rows_to_check = _detect_header_rows(df, header_row_idx, header_keywords)
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[赔偿合同] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[赔偿合同] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[赔偿合同] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_counterparty_name = None # 合同对方名称列
- col_compensation_item = None # 赔偿事项列
- col_contract_amount = None # 合同金额列
- col_settlement_submitted = None # 结算送审金额列
- col_difference = None # 差额列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "合同对方" in header_text:
- col_counterparty_name = idx
- elif "赔偿事项" in header_text:
- col_compensation_item = idx
- elif "合同金额" in header_text and "结算" not in header_text:
- col_contract_amount = idx
- elif "结算送审金额" in header_text or ("送审金额" in header_text and "结算" in header_text):
- col_settlement_submitted = idx
- elif "差额" in header_text:
- col_difference = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_counterparty_name is None:
- col_counterparty_name = 1
- if col_compensation_item is None:
- col_compensation_item = 2
-
- # 如果金额列未找到,尝试从后往前找(通常金额列在表格右侧)
- if col_contract_amount is None or col_settlement_submitted is None or col_difference is None:
- for idx in range(len(header_texts) - 1, -1, -1):
- header_text = header_texts[idx]
- if "差额" in header_text and col_difference is None:
- col_difference = idx
- elif "送审" in header_text and "金额" in header_text and col_settlement_submitted is None:
- col_settlement_submitted = idx
- elif "合同金额" in header_text and col_contract_amount is None:
- col_contract_amount = idx
- if col_contract_amount is not None and col_settlement_submitted is not None and col_difference is not None:
- break
-
- logger.info(f"[赔偿合同] 列识别: 序号={col_no}, 合同对方={col_counterparty_name}, "
- f"赔偿事项={col_compensation_item}, 合同金额={col_contract_amount}, "
- f"送审金额={col_settlement_submitted}, 差额={col_difference}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- counterparty_name_val = row.iloc[col_counterparty_name] if col_counterparty_name is not None and col_counterparty_name < len(row) else None
- compensation_item_val = row.iloc[col_compensation_item] if col_compensation_item is not None and col_compensation_item < len(row) else None
- contract_amount_val = row.iloc[col_contract_amount] if col_contract_amount is not None and col_contract_amount < len(row) else None
- settlement_submitted_val = row.iloc[col_settlement_submitted] if col_settlement_submitted is not None and col_settlement_submitted < len(row) else None
- difference_val = row.iloc[col_difference] if col_difference is not None and col_difference < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析合同对方名称,清理换行符
- counterparty_name = str(counterparty_name_val).strip() if counterparty_name_val is not None and not pd.isna(counterparty_name_val) else ""
- counterparty_name = counterparty_name.replace('\n', '').replace('\r', '')
- counterparty_name = re.sub(r'\s+', ' ', counterparty_name).strip()
-
- # 跳过空行
- if not counterparty_name or counterparty_name == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in counterparty_name for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析赔偿事项,清理换行符
- compensation_item = str(compensation_item_val).strip() if compensation_item_val is not None and not pd.isna(compensation_item_val) else ""
- compensation_item = compensation_item.replace('\n', '').replace('\r', '')
- compensation_item = re.sub(r'\s+', ' ', compensation_item).strip()
-
- # 解析金额(保留两位小数)
- contract_amount = parse_number(contract_amount_val)
- settlement_submitted_amount = parse_number(settlement_submitted_val)
- difference_amount = parse_number(difference_val)
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "contractCounterpartyName": counterparty_name,
- "compensationItem": compensation_item,
- "contractAmount": contract_amount,
- "settlementSubmittedAmount": settlement_submitted_amount,
- "differenceAmount": difference_amount,
- })
-
- return result
- def parse_material_purchase_contract1_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"物资采购合同1"表,提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "materialName": str, # 物料名称
- "contractQuantity": float, # 合同数量
- "drawingQuantity": float, # 施工图数量
- "unitPriceExcludingTax": float, # 单价(不含税)(元,两位小数)
- "differenceAmount": float, # 差额(元,两位小数)
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "物料名称", "合同数量", "施工图数量", "单价(不含税)", "差额"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- header_keywords = ["序号", "物料名称", "合同数量", "施工图数量", "单价", "差额"]
- header_rows_to_check = _detect_header_rows(df, header_row_idx, header_keywords)
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[物资采购合同1] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[物资采购合同1] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[物资采购合同1] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_material_name = None # 物料名称列
- col_contract_quantity = None # 合同数量列
- col_drawing_quantity = None # 施工图数量列
- col_unit_price = None # 单价(不含税)列
- col_difference = None # 差额列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "物料名称" in header_text:
- col_material_name = idx
- elif "合同数量" in header_text:
- col_contract_quantity = idx
- elif "施工图数量" in header_text:
- col_drawing_quantity = idx
- elif "单价" in header_text and "不含税" in header_text:
- col_unit_price = idx
- elif "差额" in header_text:
- col_difference = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_material_name is None:
- col_material_name = 1
-
- # 如果数量列未找到,尝试查找
- if col_contract_quantity is None:
- for idx, header_text in enumerate(header_texts):
- if "合同" in header_text and "数量" in header_text:
- col_contract_quantity = idx
- break
-
- if col_drawing_quantity is None:
- for idx, header_text in enumerate(header_texts):
- if "施工图" in header_text and "数量" in header_text:
- col_drawing_quantity = idx
- break
-
- # 如果金额列未找到,尝试从后往前找(通常金额列在表格右侧)
- if col_unit_price is None:
- for idx, header_text in enumerate(header_texts):
- if "单价" in header_text:
- col_unit_price = idx
- break
-
- if col_difference is None:
- for idx in range(len(header_texts) - 1, -1, -1):
- if "差额" in header_texts[idx]:
- col_difference = idx
- break
-
- logger.info(f"[物资采购合同1] 列识别: 序号={col_no}, 物料名称={col_material_name}, "
- f"合同数量={col_contract_quantity}, 施工图数量={col_drawing_quantity}, "
- f"单价={col_unit_price}, 差额={col_difference}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- material_name_val = row.iloc[col_material_name] if col_material_name is not None and col_material_name < len(row) else None
- contract_quantity_val = row.iloc[col_contract_quantity] if col_contract_quantity is not None and col_contract_quantity < len(row) else None
- drawing_quantity_val = row.iloc[col_drawing_quantity] if col_drawing_quantity is not None and col_drawing_quantity < len(row) else None
- unit_price_val = row.iloc[col_unit_price] if col_unit_price is not None and col_unit_price < len(row) else None
- difference_val = row.iloc[col_difference] if col_difference is not None and col_difference < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析物料名称,清理换行符
- material_name = str(material_name_val).strip() if material_name_val is not None and not pd.isna(material_name_val) else ""
- material_name = material_name.replace('\n', '').replace('\r', '')
- material_name = re.sub(r'\s+', ' ', material_name).strip()
-
- # 跳过空行
- if not material_name or material_name == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in material_name for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析数量(合同数量和施工图数量可能是整数或小数)
- contract_quantity = parse_number(contract_quantity_val)
- drawing_quantity = parse_number(drawing_quantity_val)
-
- # 解析金额(保留两位小数)
- unit_price_excluding_tax = parse_number(unit_price_val)
- difference_amount = parse_number(difference_val)
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "materialName": material_name,
- "contractQuantity": contract_quantity,
- "drawingQuantity": drawing_quantity,
- "unitPriceExcludingTax": unit_price_excluding_tax,
- "differenceAmount": difference_amount,
- })
-
- return result
- def parse_material_purchase_contract2_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"物资采购合同2"表,提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "materialName": str, # 物料名称
- "contractAmount": float, # 合同金额(元,两位小数)
- "bookedAmount": float, # 入账金额(元,两位小数)
- "differenceAmount": float, # 差额(元,两位小数)
- "remark": str, # 备注
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "物料名称", "合同金额(不含税)", "入账金额", "差额", "备注"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- header_keywords = ["序号", "物料名称", "合同金额", "入账金额", "差额", "备注"]
- header_rows_to_check = _detect_header_rows(df, header_row_idx, header_keywords)
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[物资采购合同2] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[物资采购合同2] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[物资采购合同2] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_material_name = None # 物料名称列
- col_contract_amount = None # 合同金额(不含税)列
- col_booked_amount = None # 入账金额列
- col_difference = None # 差额列
- col_remark = None # 备注列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "物料名称" in header_text:
- col_material_name = idx
- elif "合同金额" in header_text and "不含税" in header_text:
- col_contract_amount = idx
- elif "入账金额" in header_text:
- col_booked_amount = idx
- elif "差额" in header_text:
- col_difference = idx
- elif "备注" in header_text:
- col_remark = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_material_name is None:
- col_material_name = 1
-
- # 如果金额列未找到,尝试查找
- if col_contract_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "合同金额" in header_text:
- col_contract_amount = idx
- break
-
- if col_booked_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "入账" in header_text and "金额" in header_text:
- col_booked_amount = idx
- break
-
- # 如果差额列未找到,尝试从后往前找(通常差额列在表格右侧)
- if col_difference is None:
- for idx in range(len(header_texts) - 1, -1, -1):
- if "差额" in header_texts[idx]:
- col_difference = idx
- break
-
- # 备注列通常在最后
- if col_remark is None:
- for idx in range(len(header_texts) - 1, -1, -1):
- if "备注" in header_texts[idx]:
- col_remark = idx
- break
-
- logger.info(f"[物资采购合同2] 列识别: 序号={col_no}, 物料名称={col_material_name}, "
- f"合同金额={col_contract_amount}, 入账金额={col_booked_amount}, "
- f"差额={col_difference}, 备注={col_remark}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- material_name_val = row.iloc[col_material_name] if col_material_name is not None and col_material_name < len(row) else None
- contract_amount_val = row.iloc[col_contract_amount] if col_contract_amount is not None and col_contract_amount < len(row) else None
- booked_amount_val = row.iloc[col_booked_amount] if col_booked_amount is not None and col_booked_amount < len(row) else None
- difference_val = row.iloc[col_difference] if col_difference is not None and col_difference < len(row) else None
- remark_val = row.iloc[col_remark] if col_remark is not None and col_remark < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析物料名称,清理换行符
- material_name = str(material_name_val).strip() if material_name_val is not None and not pd.isna(material_name_val) else ""
- material_name = material_name.replace('\n', '').replace('\r', '')
- material_name = re.sub(r'\s+', ' ', material_name).strip()
-
- # 跳过空行
- if not material_name or material_name == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in material_name for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析金额(保留两位小数)
- contract_amount = parse_number(contract_amount_val)
- booked_amount = parse_number(booked_amount_val)
- difference_amount = parse_number(difference_val)
-
- # 解析备注,清理换行符
- remark = str(remark_val).strip() if remark_val is not None and not pd.isna(remark_val) else ""
- remark = remark.replace('\n', '').replace('\r', '')
- remark = re.sub(r'\s+', ' ', remark).strip()
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "materialName": material_name,
- "contractAmount": contract_amount,
- "bookedAmount": booked_amount,
- "differenceAmount": difference_amount,
- "remark": remark,
- })
-
- return result
- def parse_other_service_contract_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析"其他服务类合同"表,提取数据并生成 JSON 格式。
-
- 返回格式:
- [{
- "No": int, # 序号
- "serviceProvider": str, # 服务商
- "bidNotice": str, # 中标通知书
- "contractAmount": float, # 合同金额(元,两位小数)
- "submittedAmount": float, # 送审金额(元,两位小数)
- "settlementAmount": float, # 结算金额(元,两位小数)
- }, ...]
- """
- if df.empty:
- return []
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "服务商", "中标通知书", "合同金额", "送审金额", "结算金额"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 先修复跨行单元格(在识别表头之前)
- df = _fix_broken_cells(df, header_row_count=header_row_idx + 1)
-
- # 合并前几行作为表头(处理多行表头的情况)
- header_keywords = ["序号", "服务商", "中标通知书", "合同金额", "送审金额", "结算金额"]
- header_rows_to_check = _detect_header_rows(df, header_row_idx, header_keywords)
- header_texts = [] # 每列的合并文本
- num_cols = len(df.columns)
-
- # 打印原始表头行(用于调试)
- logger.info(f"[其他服务类合同] 原始表头行 (从第 {header_row_idx + 1} 行开始,共检查 {header_rows_to_check} 行):")
- for row_idx in range(header_row_idx, min(header_row_idx + header_rows_to_check, len(df))):
- row_data = [str(df.iloc[row_idx, col_idx]).strip() for col_idx in range(num_cols)]
- logger.info(f"[其他服务类合同] 第 {row_idx + 1} 行: {row_data}")
-
- for col_idx in range(num_cols):
- col_text_parts = []
- for row_idx in range(header_row_idx, header_row_idx + header_rows_to_check):
- if row_idx < len(df):
- cell_val = str(df.iloc[row_idx, col_idx]).strip()
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 格式化表头文本(移除换行符和空格)
- formatted_text = _format_header_text(cell_val)
- if formatted_text:
- col_text_parts.append(formatted_text)
- # 合并该列的所有表头文本(已经是格式化后的,无空格)
- merged_text = ''.join(col_text_parts)
- header_texts.append(merged_text)
-
- logger.info(f"[其他服务类合同] 合并后的表头文本(已格式化): {header_texts}")
-
- col_no = None # 序号列
- col_service_provider = None # 服务商列
- col_bid_notice = None # 中标通知书列
- col_contract_amount = None # 合同金额列
- col_submitted_amount = None # 送审金额列
- col_settlement_amount = None # 结算金额列
-
- for idx, header_text in enumerate(header_texts):
- cell_lower = header_text.lower()
- if "序号" in header_text or "no" in cell_lower:
- col_no = idx
- elif "服务商" in header_text:
- col_service_provider = idx
- elif "中标通知书" in header_text:
- col_bid_notice = idx
- elif "合同金额" in header_text and "送审" not in header_text and "结算" not in header_text:
- col_contract_amount = idx
- elif "送审金额" in header_text:
- col_submitted_amount = idx
- elif "结算金额" in header_text:
- col_settlement_amount = idx
-
- # 如果关键列未找到,尝试通过位置推断
- if col_no is None:
- col_no = 0
- if col_service_provider is None:
- col_service_provider = 1
-
- # 如果金额列未找到,尝试查找
- if col_contract_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "合同" in header_text and "金额" in header_text:
- col_contract_amount = idx
- break
-
- if col_submitted_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "送审" in header_text and "金额" in header_text:
- col_submitted_amount = idx
- break
-
- if col_settlement_amount is None:
- for idx, header_text in enumerate(header_texts):
- if "结算" in header_text and "金额" in header_text:
- col_settlement_amount = idx
- break
-
- # 如果中标通知书列未找到,尝试查找
- if col_bid_notice is None:
- for idx, header_text in enumerate(header_texts):
- if "中标" in header_text and "通知" in header_text:
- col_bid_notice = idx
- break
-
- logger.info(f"[其他服务类合同] 列识别: 序号={col_no}, 服务商={col_service_provider}, "
- f"中标通知书={col_bid_notice}, 合同金额={col_contract_amount}, "
- f"送审金额={col_submitted_amount}, 结算金额={col_settlement_amount}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- service_provider_val = row.iloc[col_service_provider] if col_service_provider is not None and col_service_provider < len(row) else None
- bid_notice_val = row.iloc[col_bid_notice] if col_bid_notice is not None and col_bid_notice < len(row) else None
- contract_amount_val = row.iloc[col_contract_amount] if col_contract_amount is not None and col_contract_amount < len(row) else None
- submitted_amount_val = row.iloc[col_submitted_amount] if col_submitted_amount is not None and col_submitted_amount < len(row) else None
- settlement_amount_val = row.iloc[col_settlement_amount] if col_settlement_amount is not None and col_settlement_amount < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- pass
-
- # 跳过序号为空的行(这些通常是合计、其中等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析服务商,清理换行符
- service_provider = str(service_provider_val).strip() if service_provider_val is not None and not pd.isna(service_provider_val) else ""
- service_provider = service_provider.replace('\n', '').replace('\r', '')
- service_provider = re.sub(r'\s+', ' ', service_provider).strip()
-
- # 跳过空行
- if not service_provider or service_provider == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in service_provider for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析中标通知书,清理换行符
- bid_notice = str(bid_notice_val).strip() if bid_notice_val is not None and not pd.isna(bid_notice_val) else ""
- bid_notice = bid_notice.replace('\n', '').replace('\r', '')
- bid_notice = re.sub(r'\s+', ' ', bid_notice).strip()
-
- # 解析金额(保留两位小数)
- contract_amount = parse_number(contract_amount_val)
- submitted_amount = parse_number(submitted_amount_val)
- settlement_amount = parse_number(settlement_amount_val)
-
- # 添加到结果
- result.append({
- "No": no if no is not None else idx + 1,
- "serviceProvider": service_provider,
- "bidNotice": bid_notice,
- "contractAmount": contract_amount,
- "submittedAmount": submitted_amount,
- "settlementAmount": settlement_amount,
- })
-
- return result
- def parse_design_review_table(df: pd.DataFrame) -> List[Dict[str, Any]]:
- """
- 解析 designReview 类型的表格,提取数据并生成结构化的 JSON 格式。
-
- 返回格式:
- [{
- "name": str, # 大类名称(如"变电工程"、"线路工程")
- "Level": 0, # 大类层级
- "staticInvestment": float, # 静态投资总计
- "dynamicInvestment": float, # 动态投资总计
- "items": [ # 子项列表
- {
- "No": int, # 序号
- "name": str, # 工程名称
- "Level": 1, # 子项层级
- "staticInvestment": float, # 静态投资
- "dynamicInvestment": float, # 动态投资
- },
- ...
- ]
- }, ...]
- """
- if df.empty:
- return []
-
- # 中文数字映射(用于识别序号格式)
- CHINESE_NUMBERS = {
- '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10,
- '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17, '十八': 18, '十九': 19, '二十': 20
- }
-
- def is_category_by_serial(no_str: str) -> bool:
- """
- 通过序号格式判断是否为大类
- 大类:序号为中文数字,如"一"、"二"、"三"(不含括号)
- 子类:序号为带括号的中文数字,如"(一)"、"(二)"、"(三)"
- """
- if not no_str:
- return False
-
- no_str = str(no_str).strip()
-
- # 如果包含括号,是子类
- if '(' in no_str or '(' in no_str or ')' in no_str or ')' in no_str:
- return False
-
- # 检查是否是中文数字(大类)
- # 移除可能的空格和标点
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
-
- # 检查是否以中文数字开头
- for chinese_num in CHINESE_NUMBERS.keys():
- if cleaned.startswith(chinese_num):
- return True
-
- return False
-
- # 尝试识别表头行(通常在前几行)
- header_row_idx = None
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- # 检查是否包含表头关键词
- if any(kw in row_text for kw in ["序号", "工程名称", "建设规模", "静态投资", "动态投资"]):
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 如果没有找到明确的表头,假设第一行是表头
- header_row_idx = 0
-
- # 从表头行识别列索引
- header_row = df.iloc[header_row_idx].astype(str).str.strip()
-
- col_no = None # 序号列
- col_name = None # 工程名称列
- col_scale = None # 建设规模列(用于判断层级)
- col_static = None # 静态投资列
- col_dynamic = None # 动态投资列
-
- for idx, cell in enumerate(header_row):
- cell_lower = cell.lower()
- if "序号" in cell or "no" in cell_lower:
- col_no = idx
- elif "工程名称" in cell or "name" in cell_lower:
- col_name = idx
- elif "建设规模" in cell or "规模" in cell:
- col_scale = idx
- elif "静态投资" in cell or "static" in cell_lower:
- col_static = idx
- elif "动态投资" in cell or "dynamic" in cell_lower:
- col_dynamic = idx
-
- # 如果关键列未找到,尝试通过位置推断(通常顺序:序号、工程名称、建设规模、静态投资、动态投资)
- if col_no is None:
- col_no = 0
- if col_name is None:
- col_name = 1
- if col_static is None:
- # 从后往前找
- for idx in range(len(df.columns) - 1, -1, -1):
- if "投资" in str(df.iloc[header_row_idx, idx]) or "元" in str(df.iloc[header_row_idx, idx]):
- if col_dynamic is None:
- col_dynamic = idx
- elif col_static is None:
- col_static = idx
- break
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- # 先解析所有行
- all_items = []
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- name_val = row.iloc[col_name] if col_name is not None and col_name < len(row) else None
- static_val = row.iloc[col_static] if col_static is not None and col_static < len(row) else None
- dynamic_val = row.iloc[col_dynamic] if col_dynamic is not None and col_dynamic < len(row) else None
-
- # 解析序号(保留原始字符串)
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- # 先尝试解析为数字
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- # 如果不是数字,尝试解析中文数字
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
- # 移除括号
- cleaned_no_brackets = cleaned.replace('(', '').replace('(', '').replace(')', '').replace(')', '')
- if cleaned_no_brackets in CHINESE_NUMBERS:
- no = CHINESE_NUMBERS[cleaned_no_brackets]
- else:
- # 尝试匹配中文数字前缀
- for chinese_num, num_val in CHINESE_NUMBERS.items():
- if cleaned_no_brackets.startswith(chinese_num):
- no = num_val
- break
-
- # 跳过序号为空的行(这些通常是"其中:"等说明行)
- if not no_str or pd.isna(no_val):
- continue
-
- # 解析工程名称
- name = str(name_val).strip() if name_val is not None and not pd.isna(name_val) else ""
-
- # 跳过空行
- if not name or name == "":
- continue
-
- # 判断是否为合计行(合计行需要跳过)
- is_total = any(kw in name for kw in ["合计", "总计", "总计", "合计金额"])
- if is_total:
- continue
-
- # 解析投资金额
- static_investment = parse_number(static_val)
- dynamic_investment = parse_number(dynamic_val)
-
- # 判断是否为大类:通过序号格式识别
- is_category = is_category_by_serial(no_str)
-
- all_items.append({
- "No": no if no is not None else idx + 1,
- "name": name,
- "isCategory": is_category,
- "staticInvestment": static_investment,
- "dynamicInvestment": dynamic_investment,
- })
-
- # 构建层级结构
- result = []
- current_category = None
-
- for item in all_items:
- if item["isCategory"]:
- # 如果遇到新的大类,先保存之前的大类(如果有)
- if current_category is not None:
- result.append(current_category)
- # 创建新的大类
- current_category = {
- "name": item["name"],
- "Level": 0,
- "staticInvestment": item["staticInvestment"],
- "dynamicInvestment": item["dynamicInvestment"],
- "items": []
- }
- else:
- # 子项,添加到当前大类
- if current_category is not None:
- current_category["items"].append({
- "No": item["No"],
- "name": item["name"],
- "Level": 1,
- "staticInvestment": item["staticInvestment"],
- "dynamicInvestment": item["dynamicInvestment"],
- })
- else:
- # 如果没有大类,作为独立项(不应该发生,但容错处理)
- result.append({
- "name": item["name"],
- "Level": 1,
- "staticInvestment": item["staticInvestment"],
- "dynamicInvestment": item["dynamicInvestment"],
- "items": []
- })
-
- # 保存最后一个大类
- if current_category is not None:
- result.append(current_category)
-
- return result
- def parse_design_review_detail_table(df: pd.DataFrame, table_title: str) -> List[Dict[str, Any]]:
- """
- 解析 designReview 类型的概算投资明细表格(规则2)。
-
- 表头格式:
- 序号 | 工程或费用名称 | 建筑工程费 | 设备购置费 | 安装工程费 | 其他费用 | 合计 | ...
-
- Args:
- df: 表格 DataFrame
- table_title: 表格标题(如"周村 220kV 变电站新建工程总概算表"),用于提取工程名称
-
- 返回格式:
- [{
- "No": int, # 序号
- "Level": int, # 明细等级
- "name": str, # 单项工程名称(从标题提取,如"周村220KV变电站新建工程")
- "projectOrExpenseName": str, # 工程或费用名称
- "constructionProjectCost": float, # 建筑工程费(元)
- "equipmentPurchaseCost": float, # 设备购置费(元)
- "installationProjectCost": float, # 安装工程费(元)
- "otherExpenses": float, # 其他费用(元)
- }, ...]
- """
- if df.empty:
- return []
-
- # 从标题中提取工程名称
- # 标题格式如:"周村 220kV 变电站新建工程总概算表" -> "周村220kV变电站新建工程"
- project_name = table_title
- if project_name:
- # 移除"总概算表"、"概算表"、"估算表"等后缀
- project_name = re.sub(r'(总概算表|概算表|估算表|汇总表)$', '', project_name)
- # 移除多余空格
- project_name = re.sub(r'\s+', '', project_name)
-
- # 中文数字映射(用于判断层级)
- CHINESE_NUMBERS = {
- '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10,
- '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17, '十八': 18, '十九': 19, '二十': 20
- }
-
- # 特殊的"其中:"项,应作为 Level 1(独立大类)而非子项
- SPECIAL_LEVEL1_ITEMS = [
- "可抵扣固定资产增值税额",
- ]
-
- def determine_level(no_str: str, expense_name: str = "") -> int:
- """
- 根据序号格式和费用名称判断层级:
- - 中文数字(一、二、三):Level 1(大类)
- - 阿拉伯数字(1、2、3):Level 1(大类)
- - 带括号的数字((1)、(一)):Level 2(子项)
- - 以"其中:"开头的行:Level 2(子项),但特殊项除外
- """
- no_str = str(no_str).strip() if no_str else ""
- expense_name = str(expense_name).strip() if expense_name else ""
-
- # 移除"其中:"前缀用于判断
- name_without_prefix = expense_name
- if expense_name.startswith("其中:"):
- name_without_prefix = expense_name[3:]
- elif expense_name.startswith("其中:"):
- name_without_prefix = expense_name[3:]
-
- # 检查是否为特殊的 Level 1 项
- for special_item in SPECIAL_LEVEL1_ITEMS:
- if special_item in name_without_prefix:
- return 1
-
- # 以"其中:"或"其中:"开头的 -> Level 2(子项)
- if expense_name.startswith("其中:") or expense_name.startswith("其中:"):
- return 2
-
- # 序号为空的情况,默认 Level 1
- if not no_str:
- return 1
-
- # 带括号的 -> Level 2(子项)
- if '(' in no_str or '(' in no_str or ')' in no_str or ')' in no_str:
- return 2
-
- # 中文数字(一、二、三等)-> Level 1(大类)
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
- for chinese_num in CHINESE_NUMBERS.keys():
- if cleaned == chinese_num or cleaned.startswith(chinese_num):
- return 1
-
- # 阿拉伯数字(1、2、3等)-> Level 1(大类)
- if re.match(r'^\d+\.?$', cleaned):
- return 1
-
- return 1 # 默认
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除常见的非数字字符(保留小数点、负号)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- # 识别表头行(可能有多行表头,需要扫描多行来找到列索引)
- # 先找到包含 "序号" 的行作为起始行
- header_start_idx = None
- for i in range(min(5, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- row_text_no_space = row_text.replace(" ", "")
- if "序号" in row_text_no_space:
- header_start_idx = i
- break
-
- if header_start_idx is None:
- header_start_idx = 0
- logger.warning(f"[表格解析] 未找到包含'序号'的表头行,使用第一行作为表头")
-
- # 扫描表头区域(可能是多行表头),收集所有列的关键词
- col_no = None
- col_name = None
- col_construction = None
- col_equipment = None
- col_installation = None
- col_other = None
-
- # 打印前几行表头用于调试
- logger.debug(f"[表格解析] DataFrame 形状: {df.shape}")
- for i in range(min(3, len(df))):
- row_vals = df.iloc[i].astype(str).tolist()
- logger.debug(f"[表格解析] 第{i}行: {row_vals}")
-
- # 扫描前几行表头(多行表头情况)
- header_end_idx = header_start_idx
- for i in range(header_start_idx, min(header_start_idx + 3, len(df))):
- row = df.iloc[i].astype(str).str.strip()
- for idx, cell in enumerate(row):
- # 移除所有空格、换行符等
- cell_no_space = re.sub(r'\s+', '', cell)
- if ("序号" in cell_no_space or cell_no_space == "No") and col_no is None:
- col_no = idx
- elif ("工程或费用名称" in cell_no_space or "费用名称" in cell_no_space) and col_name is None:
- col_name = idx
- # 增强列名匹配 - 使用更宽松的匹配
- elif col_construction is None and any(kw in cell_no_space for kw in ["建筑工程费", "建筑工程", "建筑"]):
- col_construction = idx
- elif col_equipment is None and any(kw in cell_no_space for kw in ["设备购置费", "设备购置", "设备"]):
- col_equipment = idx
- elif col_installation is None and any(kw in cell_no_space for kw in ["安装工程费", "安装工程", "安装"]):
- col_installation = idx
- elif col_other is None and "其他费用" in cell_no_space:
- col_other = idx
-
- # 检查这一行是否像数据行(第一列是数字或中文数字)
- first_cell = row.iloc[0] if len(row) > 0 else ""
- first_cell_clean = re.sub(r'\s+', '', first_cell).replace("、", "")
- chinese_nums = ['一', '二', '三', '四', '五', '六', '七', '八', '九', '十']
- if first_cell_clean and (first_cell_clean.isdigit() or first_cell_clean in chinese_nums):
- # 这一行可能是数据行,表头到此结束
- header_end_idx = i - 1
- break
- header_end_idx = i
-
- # 如果关键列未找到,尝试位置推断
- if col_no is None:
- col_no = 0
- if col_name is None:
- col_name = 1
-
- # 如果费用列未找到,尝试按位置推断
- # 假设顺序:序号(0)、名称(1)、建筑(2)、设备(3)、安装(4)、其他(5)、合计(6)...
- logger.info(f"[表格解析] 列数: {len(df.columns)}, 当前列索引: 建筑={col_construction}, 设备={col_equipment}, 安装={col_installation}, 其他={col_other}")
-
- if col_construction is None or col_equipment is None or col_installation is None or col_other is None:
- num_cols = len(df.columns)
- if num_cols >= 7:
- # 按位置推断
- if col_construction is None:
- col_construction = 2
- if col_equipment is None:
- col_equipment = 3
- if col_installation is None:
- col_installation = 4
- if col_other is None:
- col_other = 5
- logger.info(f"[表格解析] 按位置推断列索引: 建筑={col_construction}, 设备={col_equipment}, 安装={col_installation}, 其他={col_other}")
- else:
- logger.warning(f"[表格解析] 列数不足({num_cols}<7),无法按位置推断费用列")
-
- logger.info(f"[表格解析] 列索引: 序号={col_no}, 名称={col_name}, 建筑={col_construction}, 设备={col_equipment}, 安装={col_installation}, 其他={col_other}")
- logger.info(f"[表格解析] 表头范围: 行 {header_start_idx} - {header_end_idx}")
-
- # 从数据行开始解析(跳过表头行)
- data_rows = df.iloc[header_end_idx + 1:].reset_index(drop=True)
-
- result = []
- for idx, row in data_rows.iterrows():
- # 跳过空行
- if row.isna().all():
- continue
-
- # 提取各列数据
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- name_val = row.iloc[col_name] if col_name is not None and col_name < len(row) else None
- construction_val = row.iloc[col_construction] if col_construction is not None and col_construction < len(row) else None
- equipment_val = row.iloc[col_equipment] if col_equipment is not None and col_equipment < len(row) else None
- installation_val = row.iloc[col_installation] if col_installation is not None and col_installation < len(row) else None
- other_val = row.iloc[col_other] if col_other is not None and col_other < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- # 尝试中文数字
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
- cleaned_no_brackets = cleaned.replace('(', '').replace('(', '').replace(')', '').replace(')', '')
- if cleaned_no_brackets in CHINESE_NUMBERS:
- no = CHINESE_NUMBERS[cleaned_no_brackets]
-
- # 解析工程或费用名称(提前解析,用于判断是否为"其中:"行)
- expense_name = str(name_val).strip() if name_val is not None and not pd.isna(name_val) else ""
-
- # 跳过空行
- if not expense_name:
- continue
-
- # 判断是否为"其中:"开头的行(这类行序号通常为空,但需要保留)
- is_sub_item = expense_name.startswith("其中:") or expense_name.startswith("其中:")
-
- # 跳过序号为空或无效的行(但"其中:"行除外)
- if (not no_str or pd.isna(no_val)) and not is_sub_item:
- continue
-
- # 跳过合计行
- if any(kw in expense_name for kw in ["合计", "总计", "小计"]):
- continue
-
- # 判断层级(传入费用名称用于判断"其中:")
- level = determine_level(no_str, expense_name)
-
- # 去除"其中:"前缀
- clean_expense_name = expense_name
- if expense_name.startswith("其中:"):
- clean_expense_name = expense_name[3:]
- elif expense_name.startswith("其中:"):
- clean_expense_name = expense_name[3:]
-
- # 解析费用金额
- construction_cost = parse_number(construction_val)
- equipment_cost = parse_number(equipment_val)
- installation_cost = parse_number(installation_val)
- other_cost = parse_number(other_val)
-
- result.append({
- "No": no if no is not None else idx + 1,
- "Level": level,
- "name": project_name, # 从标题提取的工程名称
- "projectOrExpenseName": clean_expense_name,
- "constructionProjectCost": construction_cost,
- "equipmentPurchaseCost": equipment_cost,
- "installationProjectCost": installation_cost,
- "otherExpenses": other_cost,
- })
-
- logger.info(f"[表格解析] 解析完成: 工程名称={project_name}, 共 {len(result)} 条数据")
- return result
- def parse_design_review_cost_table(df: pd.DataFrame, table_title: str) -> List[Dict[str, Any]]:
- """
- 解析 designReview 类型的概算投资费用表格(规则3)。
-
- 表头格式:
- 序号 | 工程或费用名称 | 费用金额 | 各项占静态投资% | 单位投资万元/km
-
- Args:
- df: 表格 DataFrame
- table_title: 表格标题(如"周村 220kV 变电站新建工程总概算表"),用于提取工程名称
-
- 返回格式:
- [{
- "No": int, # 序号
- "Level": int, # 明细等级
- "name": str, # 单项工程名称(从标题提取)
- "projectOrExpenseName": str, # 工程或费用名称
- "cost": float, # 费用金额(元)
- }, ...]
- """
- if df.empty:
- return []
-
- # 从标题中提取工程名称
- project_name = table_title
- if project_name:
- # 移除"总概算表"、"概算表"、"估算表"等后缀
- project_name = re.sub(r'(总概算表|概算表|估算表|汇总表)$', '', project_name)
- # 移除多余空格
- project_name = re.sub(r'\s+', '', project_name)
-
- # 中文数字映射(用于判断层级)
- CHINESE_NUMBERS = {
- '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10,
- '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17, '十八': 18, '十九': 19, '二十': 20
- }
-
- # 特殊的"其中:"项,应作为 Level 1(独立大类)而非子项
- SPECIAL_LEVEL1_ITEMS = [
- "可抵扣固定资产增值税额",
- ]
-
- def determine_level(no_str: str, expense_name: str = "") -> int:
- """
- 根据序号格式和费用名称判断层级:
- - 中文数字(一、二、三):Level 1(大类)
- - 阿拉伯数字(1、2、3):Level 1(大类)
- - 带括号的数字((1)、(一)):Level 2(子项)
- - 以"其中:"开头的行:Level 2(子项),但特殊项除外
- """
- no_str = str(no_str).strip() if no_str else ""
- expense_name = str(expense_name).strip() if expense_name else ""
-
- # 移除"其中:"前缀用于判断
- name_without_prefix = expense_name
- if expense_name.startswith("其中:"):
- name_without_prefix = expense_name[3:]
- elif expense_name.startswith("其中:"):
- name_without_prefix = expense_name[3:]
-
- # 检查是否为特殊的 Level 1 项
- for special_item in SPECIAL_LEVEL1_ITEMS:
- if special_item in name_without_prefix:
- return 1
-
- # 以"其中:"或"其中:"开头的 -> Level 2(子项)
- if expense_name.startswith("其中:") or expense_name.startswith("其中:"):
- return 2
-
- # 序号为空的情况,默认 Level 1
- if not no_str:
- return 1
-
- # 带括号的 -> Level 2(子项)
- if '(' in no_str or '(' in no_str or ')' in no_str or ')' in no_str:
- return 2
-
- # 中文数字(一、二、三等)-> Level 1(大类)
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
- for chinese_num in CHINESE_NUMBERS.keys():
- if cleaned == chinese_num or cleaned.startswith(chinese_num):
- return 1
-
- # 阿拉伯数字(1、2、3等)-> Level 1(大类)
- if re.match(r'^\d+\.?$', cleaned):
- return 1
-
- return 1
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- # 识别表头行
- header_row_idx = None
- for i in range(min(5, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- row_text_no_space = row_text.replace(" ", "")
- if "工程或费用名称" in row_text_no_space and "费用金额" in row_text_no_space:
- header_row_idx = i
- break
- elif "序号" in row_text and "费用金额" in row_text:
- header_row_idx = i
- break
-
- if header_row_idx is None:
- header_row_idx = 0
- logger.warning(f"[表格解析] 未找到明确表头,使用第一行作为表头")
-
- # 从表头行识别列索引
- header_row = df.iloc[header_row_idx].astype(str).str.strip()
-
- col_no = None
- col_name = None
- col_cost = None
-
- for idx, cell in enumerate(header_row):
- cell_no_space = cell.replace(" ", "")
- if "序号" in cell_no_space or cell_no_space == "No":
- col_no = idx
- elif "工程或费用名称" in cell_no_space or "费用名称" in cell_no_space:
- col_name = idx
- elif "费用金额" in cell_no_space:
- col_cost = idx
-
- if col_no is None:
- col_no = 0
- if col_name is None:
- col_name = 1
- if col_cost is None:
- col_cost = 2
-
- logger.info(f"[表格解析] 列索引: 序号={col_no}, 名称={col_name}, 费用金额={col_cost}")
-
- # 从数据行开始解析
- data_rows = df.iloc[header_row_idx + 1:].reset_index(drop=True)
-
- result = []
- for idx, row in data_rows.iterrows():
- if row.isna().all():
- continue
-
- no_val = row.iloc[col_no] if col_no is not None and col_no < len(row) else None
- name_val = row.iloc[col_name] if col_name is not None and col_name < len(row) else None
- cost_val = row.iloc[col_cost] if col_cost is not None and col_cost < len(row) else None
-
- # 解析序号
- no = None
- no_str = ""
- if no_val is not None and not pd.isna(no_val):
- no_str = str(no_val).strip()
- if no_str:
- try:
- no = int(float(no_str))
- except (ValueError, TypeError):
- cleaned = no_str.replace(' ', '').replace('、', '').replace('.', '').replace('。', '')
- cleaned_no_brackets = cleaned.replace('(', '').replace('(', '').replace(')', '').replace(')', '')
- if cleaned_no_brackets in CHINESE_NUMBERS:
- no = CHINESE_NUMBERS[cleaned_no_brackets]
-
- # 解析工程或费用名称(提前解析,用于判断是否为"其中:"行)
- expense_name = str(name_val).strip() if name_val is not None and not pd.isna(name_val) else ""
-
- if not expense_name:
- continue
-
- # 判断是否为"其中:"开头的行(这类行序号通常为空,但需要保留)
- is_sub_item = expense_name.startswith("其中:") or expense_name.startswith("其中:")
-
- # 跳过序号为空或无效的行(但"其中:"行除外)
- if (not no_str or pd.isna(no_val)) and not is_sub_item:
- continue
-
- if any(kw in expense_name for kw in ["合计", "总计", "小计"]):
- continue
-
- # 判断层级(传入费用名称用于判断"其中:")
- level = determine_level(no_str, expense_name)
-
- # 去除"其中:"前缀
- clean_expense_name = expense_name
- if expense_name.startswith("其中:"):
- clean_expense_name = expense_name[3:]
- elif expense_name.startswith("其中:"):
- clean_expense_name = expense_name[3:]
-
- cost = parse_number(cost_val)
-
- result.append({
- "No": no if no is not None else idx + 1,
- "Level": level,
- "name": project_name,
- "projectOrExpenseName": clean_expense_name,
- "cost": cost,
- })
-
- logger.info(f"[表格解析] 解析完成: 工程名称={project_name}, 共 {len(result)} 条数据")
- return result
- def _group_items_by_name(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 将平铺的项目列表按 name 字段分组,并按 Level 嵌套:
- - Level 1 的项作为大类
- - Level 2 的项作为大类的子项(放入 items 中)
-
- 输入:
- [
- {"No": "一", "Level": 1, "name": "工程A", "projectOrExpenseName": "主辅生产工程", ...},
- {"No": "(一)", "Level": 2, "name": "工程A", "projectOrExpenseName": "主要生产工程", ...},
- {"No": "(二)", "Level": 2, "name": "工程A", "projectOrExpenseName": "辅助生产工程", ...},
- {"No": "二", "Level": 1, "name": "工程A", "projectOrExpenseName": "其他费用", ...},
- {"No": "", "Level": 2, "name": "工程A", "projectOrExpenseName": "其中:建设场地征用", ...},
- ]
-
- 输出:
- [
- {
- "name": "工程A",
- "items": [
- {
- "No": "一", "Level": 1, "projectOrExpenseName": "主辅生产工程", ...,
- "items": [
- {"No": "(一)", "Level": 2, "projectOrExpenseName": "主要生产工程", ...},
- {"No": "(二)", "Level": 2, "projectOrExpenseName": "辅助生产工程", ...},
- ]
- },
- {
- "No": "二", "Level": 1, "projectOrExpenseName": "其他费用", ...,
- "items": [
- {"No": "", "Level": 2, "projectOrExpenseName": "其中:建设场地征用", ...},
- ]
- }
- ]
- }
- ]
- """
- if not items:
- return []
-
- from collections import OrderedDict
-
- # 第一步:按工程名称(name)分组
- grouped_by_name: OrderedDict[str, List[Dict[str, Any]]] = OrderedDict()
-
- for item in items:
- name = item.get("name", "未知工程")
- if name not in grouped_by_name:
- grouped_by_name[name] = []
- # 复制 item 并移除 name 字段
- item_copy = {k: v for k, v in item.items() if k != "name"}
- grouped_by_name[name].append(item_copy)
-
- # 第二步:在每个工程组内,按 Level 建立父子关系
- result = []
- for name, group_items in grouped_by_name.items():
- nested_items = []
- current_parent = None
-
- for item in group_items:
- level = item.get("Level", 1)
-
- if level == 1:
- # Level 1 是大类,创建新的父项
- item_with_children = dict(item)
- item_with_children["items"] = []
- nested_items.append(item_with_children)
- current_parent = item_with_children
- elif level == 2:
- # Level 2 是子项,放入当前父项的 items 中
- if current_parent is not None:
- # 移除 Level 字段(子项统一在父项下,不需要重复标识)
- child_item = {k: v for k, v in item.items()}
- current_parent["items"].append(child_item)
- else:
- # 没有父项,作为独立项处理
- nested_items.append(item)
- else:
- # 其他 Level,作为独立项
- nested_items.append(item)
-
- result.append({
- "name": name,
- "items": nested_items
- })
-
- return result
- def parse_final_account_table(df: pd.DataFrame, project_name: str, project_no: int) -> List[Dict[str, Any]]:
- """
- 解析 finalAccount 类型的单项工程投资完成情况表格。
-
- 表格结构:
- 费用项目 | 概算金额 | 决算金额(审定金额-不含税) | 增值税额 | 超(-)节(+)支金额 | 超(-)节(+)支率
-
- Args:
- df: 表格 DataFrame
- project_name: 项目名称(从标题提取,如"周村220kV输变电工程变电站新建工程")
- project_no: 项目序号(如1、2、3、4)
-
- 返回格式:
- [{
- "No": int, # 序号(项目序号)
- "name": str, # 项目名称(审计内容)
- "feeName": str, # 费用项目
- "estimatedCost": str, # 概算金额
- "approvedFinalAccountExcludingVat": str, # 决算金额审定不含税
- "vatAmount": str, # 增值税额
- "costVariance": str, # 超节支金额
- "varianceRate": str, # 超节支率
- }, ...]
- """
- if df.empty:
- return []
-
- def parse_number(value: Any) -> str:
- """解析数字,返回字符串保留原始精度"""
- if pd.isna(value):
- return "0"
- value_str = str(value).strip()
- # 移除非数字字符(保留负号和小数点)
- value_str = re.sub(r'[^\d.\-]', '', value_str)
- if not value_str or value_str == '-':
- return "0"
- return value_str
-
- def parse_rate(value: Any) -> str:
- """解析百分比,返回字符串"""
- if pd.isna(value):
- return "0%"
- value_str = str(value).strip()
- # 移除非数字字符(保留负号、小数点和百分号)
- if '%' not in value_str:
- # 提取数字部分并添加百分号
- num_str = re.sub(r'[^\d.\-]', '', value_str)
- if num_str and num_str != '-':
- return f"{num_str}%"
- return "0%"
- return value_str
-
- # 识别表头行
- header_row_idx = None
- for i in range(min(5, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- row_text_no_space = row_text.replace(" ", "")
- # 匹配表头特征
- if "费用项目" in row_text_no_space and "概算金额" in row_text_no_space:
- header_row_idx = i
- break
- elif "概算金额" in row_text_no_space and "决算金额" in row_text_no_space:
- header_row_idx = i
- break
-
- if header_row_idx is None:
- # 检查是否有多行表头(如"决算金额"跨行)
- for i in range(min(3, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- if "1" in row_text and "2" in row_text and "3" in row_text:
- # 这是列序号行,表头在前面
- header_row_idx = i
- break
- if header_row_idx is None:
- header_row_idx = 0
- logger.warning(f"[表格解析] 未找到明确表头,使用第一行作为表头")
-
- # 从表头行识别列索引
- header_row = df.iloc[header_row_idx].astype(str).str.strip()
-
- col_fee_name = None # 费用项目
- col_estimated = None # 概算金额
- col_approved = None # 审定金额(不含税)
- col_vat = None # 增值税额
- col_variance = None # 超节支金额
- col_variance_rate = None # 超节支率
-
- for idx, cell in enumerate(header_row):
- cell_no_space = cell.replace(" ", "")
- if "费用项目" in cell_no_space:
- col_fee_name = idx
- elif "概算金额" in cell_no_space or cell_no_space == "1":
- col_estimated = idx
- elif "审定金额" in cell_no_space or "不含税" in cell_no_space or cell_no_space == "2":
- col_approved = idx
- elif "增值税额" in cell_no_space or "增值税" in cell_no_space or cell_no_space == "3":
- col_vat = idx
- elif "超" in cell_no_space and "节" in cell_no_space and "金额" in cell_no_space:
- col_variance = idx
- elif cell_no_space == "4=1-2-3":
- col_variance = idx
- elif "超" in cell_no_space and "节" in cell_no_space and "率" in cell_no_space:
- col_variance_rate = idx
- elif cell_no_space == "5=4/1":
- col_variance_rate = idx
-
- # 如果无法识别列,使用默认索引(根据OCR输出的表格结构)
- if col_fee_name is None:
- col_fee_name = 0
- if col_estimated is None:
- col_estimated = 1
- if col_approved is None:
- col_approved = 2
- if col_vat is None:
- col_vat = 3
- if col_variance is None:
- col_variance = 4
- if col_variance_rate is None:
- col_variance_rate = 5
-
- logger.info(f"[表格解析] 列索引: 费用项目={col_fee_name}, 概算金额={col_estimated}, "
- f"审定不含税={col_approved}, 增值税={col_vat}, 超节支金额={col_variance}, 超节支率={col_variance_rate}")
-
- # 跳过表头行(可能有多行表头)
- # 找到数据开始行(第一个包含"建筑安装"或"设备购置"的行)
- data_start_idx = header_row_idx + 1
- for i in range(header_row_idx + 1, min(header_row_idx + 5, len(df))):
- row_text = " ".join(df.iloc[i].astype(str).str.strip().tolist())
- if "建筑安装" in row_text or "设备购置" in row_text or "其他费用" in row_text:
- data_start_idx = i
- break
- # 跳过列序号行(如 "1 | 2 | 3 | 4=1-2-3 | 5=4/1")
- if re.match(r'^[\d\s=\-/]+$', row_text.replace(" ", "")):
- continue
-
- # 从数据行开始解析
- data_rows = df.iloc[data_start_idx:].reset_index(drop=True)
-
- result = []
- for idx, row in data_rows.iterrows():
- if row.isna().all():
- continue
-
- fee_name = row.iloc[col_fee_name] if col_fee_name < len(row) else None
- estimated = row.iloc[col_estimated] if col_estimated < len(row) else None
- approved = row.iloc[col_approved] if col_approved < len(row) else None
- vat = row.iloc[col_vat] if col_vat < len(row) else None
- variance = row.iloc[col_variance] if col_variance < len(row) else None
- variance_rate = row.iloc[col_variance_rate] if col_variance_rate < len(row) else None
-
- # 解析费用项目名称
- fee_name_str = str(fee_name).strip() if fee_name is not None and not pd.isna(fee_name) else ""
-
- if not fee_name_str:
- continue
-
- # 跳过合计行
- if any(kw in fee_name_str for kw in ["合计", "总计", "小计"]):
- continue
-
- # 只保留主要费用项目:建筑安装工程、设备购置、其他费用
- valid_fee_names = ["建筑安装工程", "建筑安装", "设备购置", "其他费用"]
- is_valid = any(kw in fee_name_str for kw in valid_fee_names)
- if not is_valid:
- continue
-
- result.append({
- "No": project_no,
- "name": project_name,
- "feeName": fee_name_str,
- "estimatedCost": parse_number(estimated),
- "approvedFinalAccountExcludingVat": parse_number(approved),
- "vatAmount": parse_number(vat),
- "costVariance": parse_number(variance),
- "varianceRate": parse_rate(variance_rate),
- })
-
- logger.info(f"[表格解析] 解析完成: 项目名称={project_name}, 共 {len(result)} 条数据")
- return result
- def parse_settlement_report_tables(
- merged_tables: List[Tuple[int, pd.DataFrame, str, int]]
- ) -> Dict[str, List[Dict[str, Any]]]:
- """
- 解析 settlementReport 类型的所有表格,按表名组织返回。
-
- 返回格式:
- {
- "审定结算汇总表": [...],
- "合同执行情况": [],
- "赔偿合同": [],
- "物资采购合同1": [],
- "物资采购合同2": [],
- "其他服务类合同": [],
- }
- """
- result = {
- "审定结算汇总表": [],
- "合同执行情况": [],
- "赔偿合同": [],
- "物资采购合同1": [],
- "物资采购合同2": [],
- "其他服务类合同": [],
- }
-
- for orig_idx, df, rule_name, page in merged_tables:
- try:
- logger.info(f"[表格解析] 开始解析表格: {rule_name} (页面 {page}, 行数: {len(df)})")
- if rule_name == "审定结算汇总表":
- parsed_data = parse_settlement_summary_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- elif rule_name == "合同执行情况":
- parsed_data = parse_contract_execution_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- elif rule_name == "赔偿合同":
- parsed_data = parse_compensation_contract_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- elif rule_name == "物资采购合同1":
- parsed_data = parse_material_purchase_contract1_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- elif rule_name == "物资采购合同2":
- parsed_data = parse_material_purchase_contract2_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- elif rule_name == "其他服务类合同":
- parsed_data = parse_other_service_contract_table(df)
- if parsed_data:
- result[rule_name] = parsed_data
- logger.info(f"[表格解析] {rule_name}: 解析成功,共 {len(parsed_data)} 条数据")
- else:
- logger.warning(f"[表格解析] 未知的表格类型: {rule_name}")
- except Exception as e:
- # 如果解析失败,记录错误但不影响其他表格
- logger.warning(f"[表格解析] 解析 {rule_name} 表格失败: {e}", exc_info=True)
-
- return result
- def extract_and_filter_tables_for_pdf(
- pdf_path: str,
- base_output_dir: str,
- doc_type: Literal["settlementReport", "designReview", "finalAccount"],
- ) -> Dict[str, Any]:
- """
- 从指定 PDF 提取所有表格 + 合并后的表格 + 筛选后的表格,全部落盘。
- 返回结构:
- {
- "tables_root": str,
- "extracted_dir": str,
- "merged_dir": str,
- "filtered_dir": str,
- "all_tables": [
- {"page": int, "index_on_page": int, "excel_path": str}
- ],
- "merged_tables": [
- {"page": int, "index_on_page": int, "excel_path": str}
- ],
- "filtered_tables": [
- {"page": int, "index_on_page": int, "rule_name": str, "excel_path": str}
- ],
- }
- """
- pdf_path_obj = Path(pdf_path)
- base_output = Path(base_output_dir)
- tables_root = base_output / "tables"
- extracted_dir = tables_root / "extracted_tables"
- merged_dir = tables_root / "merged_tables"
- filtered_dir = tables_root / "filtered_tables"
- extracted_dir.mkdir(parents=True, exist_ok=True)
- merged_dir.mkdir(parents=True, exist_ok=True)
- filtered_dir.mkdir(parents=True, exist_ok=True)
- # 立即输出日志,确保日志系统正常工作
- logger.info(f"[表格提取] ========== 开始处理 PDF ==========")
- logger.info(f"[表格提取] PDF 路径: {pdf_path}")
- logger.info(f"[表格提取] 文档类型: {doc_type}")
- logger.info(f"[表格提取] PDF 文件存在: {Path(pdf_path).exists()}")
- if Path(pdf_path).exists():
- logger.info(f"[表格提取] PDF 文件大小: {Path(pdf_path).stat().st_size} bytes")
- logger.info(f"[表格提取] 输出目录: {base_output_dir}")
- # 1. 使用 pdfplumber 从 PDF 提取所有表格(不限制页数)
- # 对于 designReview 类型,启用标题提取(用于识别多个概算表)
- extract_titles = (doc_type == "designReview")
- logger.info("[表格提取] 步骤1: 使用 pdfplumber 提取所有表格...")
- tables_data = extract_tables_with_pdfplumber(str(pdf_path_obj), pages="all", extract_titles=extract_titles)
- logger.info(f"[表格提取] 步骤1完成: 共提取到 {len(tables_data)} 个表格")
- # 2. 保存所有原始表格为 xlsx,命名: table_page{page}_{index}.xlsx
- logger.info("[表格提取] 步骤2: 保存所有原始表格到 extracted_tables...")
- page_table_count: Dict[int, int] = {}
- all_tables_meta: List[Dict[str, Any]] = []
- # 存储表格标题映射(用于后续解析)
- table_titles: Dict[int, str] = {} # orig_idx -> title
- # 给每个表格一个全局索引,方便后续合并/去重
- all_tables: List[Tuple[int, pd.DataFrame, int]] = [] # (orig_idx, df, page)
- for orig_idx, (page, df, _bbox, title) in enumerate(tables_data):
- # 保存标题映射
- if title:
- table_titles[orig_idx] = title
- page_table_count[page] = page_table_count.get(page, 0) + 1
- idx_on_page = page_table_count[page]
- excel_path = extracted_dir / f"table_page{page}_{idx_on_page}.xlsx"
- df.to_excel(str(excel_path), index=False, header=False)
- all_tables_meta.append(
- {
- "page": page,
- "index_on_page": idx_on_page,
- "excel_path": str(excel_path),
- }
- )
- all_tables.append((orig_idx, df.copy(), page))
- logger.info(f"[表格提取] 步骤2完成: 已保存 {len(all_tables)} 个原始表格")
- # 3. 合并所有跨页表格(不进行过滤),保存到 merged_tables
- logger.info("[表格提取] 步骤3: 合并跨页表格...")
- merged_all_tables = _merge_all_tables(all_tables)
- logger.info(f"[表格提取] 步骤3完成: 从 {len(all_tables)} 个原始表格合并为 {len(merged_all_tables)} 个表格")
-
- merged_page_table_count: Dict[int, int] = {}
- merged_meta: List[Dict[str, Any]] = []
-
- logger.info("[表格提取] 步骤3.1: 保存合并后的表格到 merged_tables...")
- for merged_idx, (orig_idx, df, page) in enumerate(merged_all_tables):
- merged_page_table_count[page] = merged_page_table_count.get(page, 0) + 1
- idx_on_page = merged_page_table_count[page]
-
- excel_path = merged_dir / f"table_{merged_idx + 1}.xlsx"
- df.to_excel(str(excel_path), index=False, header=False)
-
- merged_meta.append(
- {
- "page": page,
- "index_on_page": idx_on_page,
- "excel_path": str(excel_path),
- }
- )
- logger.info(f"[表格提取] 步骤3.1完成: 已保存 {len(merged_meta)} 个合并后的表格")
- # 4. 根据 doc_type 选择对应的表头规则
- logger.info(f"[表格提取] 步骤4: 加载表头规则,文档类型: {doc_type}")
- header_rules = TABLE_TYPE_RULES.get(doc_type, [])
- logger.info(f"[表格提取] 步骤4完成: 找到 {len(header_rules)} 个表头规则")
- # 如果没有规则,直接返回(保留 extracted_tables 和 merged_tables)
- if not header_rules:
- logger.warning("[表格提取] 未找到表头规则,跳过筛选步骤")
- return {
- "tables_root": str(tables_root),
- "extracted_dir": str(extracted_dir),
- "merged_dir": str(merged_dir),
- "filtered_dir": str(filtered_dir),
- "all_tables": all_tables_meta,
- "merged_tables": merged_meta,
- "filtered_tables": [],
- }
- # 5. 从合并后的表格中过滤出匹配规则的表格
- logger.info("[表格提取] 步骤5: 从合并后的表格中筛选匹配规则的表格...")
- # 增加 title 字段: (orig_idx, df, rule_name, page, title)
- matched_for_merge: List[Tuple[int, pd.DataFrame, str, int, str]] = []
- for merged_idx, (orig_idx, df, page) in enumerate(merged_all_tables):
- rule_name: Optional[str] = None
- for rule in header_rules:
- is_match, rn = check_table_header(df, rule)
- if is_match:
- rule_name = rn
- # 获取表格标题
- title = table_titles.get(orig_idx, "")
- logger.info(f"[表格提取] 表格 {merged_idx + 1} (页面 {page}) 匹配规则: {rule_name}, 标题: {title}")
- break
- if rule_name:
- title = table_titles.get(orig_idx, "")
- matched_for_merge.append((orig_idx, df.copy(), rule_name, page, title))
- logger.info(f"[表格提取] 步骤5完成: 共匹配到 {len(matched_for_merge)} 个表格")
- # 如果没有匹配到表格,直接返回(保留 extracted_tables 和 merged_tables)
- if not matched_for_merge:
- logger.warning("[表格提取] 未匹配到任何表格,跳过后续处理")
- return {
- "tables_root": str(tables_root),
- "extracted_dir": str(extracted_dir),
- "merged_dir": str(merged_dir),
- "filtered_dir": str(filtered_dir),
- "all_tables": all_tables_meta,
- "merged_tables": merged_meta,
- "filtered_tables": [],
- }
- # 6. 对已匹配规则的表格再次进行跨页合并(处理规则匹配后的特殊情况)
- logger.info("[表格提取] 步骤6: 对已匹配规则的表格进行跨页合并...")
- merged_tables = _merge_cross_page_tables(matched_for_merge, header_rules)
- logger.info(f"[表格提取] 步骤6完成: 跨页合并后剩余 {len(merged_tables)} 个表格")
- # 7. 保存筛选+合并后的表格到 filtered_dir,命名仍然按 page + 序号
- logger.info("[表格提取] 步骤7: 保存筛选+合并后的表格到 filtered_tables...")
- filtered_page_table_count: Dict[int, int] = {}
- filtered_meta: List[Dict[str, Any]] = []
- parsed_data = None
- for orig_idx, df, rule_name, page, title in merged_tables:
- filtered_page_table_count[page] = filtered_page_table_count.get(page, 0) + 1
- idx_on_page = filtered_page_table_count[page]
- excel_path = filtered_dir / f"table_page{page}_{idx_on_page}.xlsx"
- df.to_excel(str(excel_path), index=False, header=False)
- filtered_meta.append(
- {
- "page": page,
- "index_on_page": idx_on_page,
- "rule_name": rule_name,
- "title": title,
- "excel_path": str(excel_path),
- }
- )
- logger.info(f"[表格提取] 步骤7完成: 已保存 {len(filtered_meta)} 个筛选后的表格")
-
- # 8. 根据文档类型解析表格数据
- logger.info(f"[表格提取] 步骤8: 解析表格数据,文档类型: {doc_type}...")
- logger.info(f"[表格提取] 待解析的表格列表: {[(rule_name, page, title) for _, _, rule_name, page, title in merged_tables]}")
- if doc_type == "designReview":
- # 对于 designReview 类型,返回类似 settlementReport 的结构
- # 按规则类型分组:
- # - 初设评审的概算投资(规则1): 嵌套结构
- # - 初设评审的概算投资明细(规则2): 平铺结构,多个表格
- # - 初设评审的概算投资费用(规则3): 平铺结构,多个表格
- parsed_data = {
- "初设评审的概算投资": [],
- "初设评审的概算投资明细": [],
- "初设评审的概算投资费用": [],
- }
-
- for orig_idx, df, rule_name, page, title in merged_tables:
- if rule_name == "初设评审的概算投资":
- # 规则1:嵌套结构
- try:
- logger.info(f"[表格提取] 解析 designReview 表格(规则1): {rule_name} (页面 {page}, 行数: {len(df)})")
- summary_data = parse_design_review_table(df)
- if summary_data:
- parsed_data["初设评审的概算投资"] = summary_data
- logger.info(f"[表格提取] 解析完成: 共 {len(summary_data)} 条数据")
- except Exception as e:
- logger.warning(f"[表格提取] 解析 designReview 表格(规则1)失败: {e}", exc_info=True)
- elif rule_name == "初设评审的概算投资明细":
- # 规则2:使用标题作为工程名称
- try:
- logger.info(f"[表格提取] 解析 designReview 表格(规则2): {rule_name} (页面 {page}, 标题: {title}, 行数: {len(df)})")
- detail_data = parse_design_review_detail_table(df, title)
- if detail_data:
- parsed_data["初设评审的概算投资明细"].extend(detail_data)
- logger.info(f"[表格提取] 解析完成: 共 {len(detail_data)} 条数据")
- except Exception as e:
- logger.warning(f"[表格提取] 解析 designReview 表格(规则2)失败: {e}", exc_info=True)
- elif rule_name == "初设评审的概算投资费用":
- # 规则3:使用标题作为工程名称
- try:
- logger.info(f"[表格提取] 解析 designReview 表格(规则3): {rule_name} (页面 {page}, 标题: {title}, 行数: {len(df)})")
- cost_data = parse_design_review_cost_table(df, title)
- if cost_data:
- parsed_data["初设评审的概算投资费用"].extend(cost_data)
- logger.info(f"[表格提取] 解析完成: 共 {len(cost_data)} 条数据")
- except Exception as e:
- logger.warning(f"[表格提取] 解析 designReview 表格(规则3)失败: {e}", exc_info=True)
-
- # 将规则2和规则3的平铺结果按工程名称分组为嵌套结构
- for rule_key in ["初设评审的概算投资明细", "初设评审的概算投资费用"]:
- if rule_key in parsed_data and parsed_data[rule_key]:
- flat_items = parsed_data[rule_key]
- grouped = _group_items_by_name(flat_items)
- parsed_data[rule_key] = grouped
-
- # 统计解析结果
- logger.info(f"[表格提取] 解析结果统计:")
- total_records = 0
- for table_type, table_data in parsed_data.items():
- if table_data:
- if isinstance(table_data, list):
- # 嵌套结构,统计所有 items
- record_count = sum(len(item.get("items", [])) for item in table_data) if table_data and isinstance(table_data[0], dict) and "items" in table_data[0] else len(table_data)
- else:
- record_count = len(table_data)
- total_records += record_count
- logger.info(f"[表格提取] - {table_type}: {len(table_data)} 个工程,共 {record_count} 条明细")
- else:
- logger.info(f"[表格提取] - {table_type}: 未匹配到数据")
- logger.info(f"[表格提取] 总计: {total_records} 条数据")
-
- elif doc_type == "settlementReport":
- # 对于 settlementReport 类型,解析所有匹配的表格,按表名组织
- try:
- logger.info(f"[表格提取] 解析 settlementReport 表格,共 {len(merged_tables)} 个表格")
- # 转换为4元组格式以兼容现有的 parse_settlement_report_tables 函数
- tables_4tuple = [(orig_idx, df, rule_name, page) for orig_idx, df, rule_name, page, title in merged_tables]
- parsed_data = parse_settlement_report_tables(tables_4tuple)
- # 统计每个表的解析结果
- logger.info(f"[表格提取] 解析结果统计:")
- total_records = 0
- for table_name, table_data in parsed_data.items():
- if table_data:
- record_count = len(table_data)
- total_records += record_count
- logger.info(f"[表格提取] - {table_name}: {record_count} 条数据")
- else:
- logger.info(f"[表格提取] - {table_name}: 未匹配到数据")
- logger.info(f"[表格提取] 总计: {total_records} 条数据")
- except Exception as e:
- logger.warning(f"[表格提取] 解析 settlementReport 表格失败: {e}", exc_info=True)
- logger.info("[表格提取] 步骤8完成: 表格数据解析完成")
- result = {
- "tables_root": str(tables_root),
- "extracted_dir": str(extracted_dir),
- "merged_dir": str(merged_dir),
- "filtered_dir": str(filtered_dir),
- "all_tables": all_tables_meta,
- "merged_tables": merged_meta,
- "filtered_tables": filtered_meta,
- }
-
- # 添加解析后的 JSON 数据
- if parsed_data is not None:
- result["parsed_data"] = parsed_data
- result["parsed_data_json"] = json.dumps(parsed_data, ensure_ascii=False, indent=2)
- logger.info("[表格提取] JSON 数据已生成")
-
- logger.info(f"[表格提取] 处理完成: 原始表格 {len(all_tables_meta)} 个, 合并后 {len(merged_meta)} 个, 筛选后 {len(filtered_meta)} 个")
- return result
|