| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782 |
- from pathlib import Path
- import pandas as pd
- from tqdm import tqdm
- import time
- import re
- from typing import List, Optional, Tuple
- import subprocess
- import os
- # 导入 pdfplumber 用于提取表格
- try:
- import pdfplumber
- PDFPLUMBER_AVAILABLE = True
- except ImportError:
- PDFPLUMBER_AVAILABLE = False
- print("⚠ pdfplumber 未安装,无法提取表格")
- print(" 安装命令: pip install pdfplumber")
- # 导入 PyMuPDF 用于提取文本
- try:
- import fitz # PyMuPDF
- PYMUPDF_AVAILABLE = True
- except ImportError:
- PYMUPDF_AVAILABLE = False
- print("⚠ PyMuPDF 未安装,无法提取表格前的文本")
- print(" 安装命令: pip install PyMuPDF")
- # ==================== 配置区域 ====================
- pdf_path = '/home/hws/workspace/GitLab/Clerk2.5/pdf_converter_v2/3-数据/鄂电司发展〔2024〕124号 国网湖北省电力有限公司关于襄阳连云220千伏输变电工程可行性研究报告的批复.pdf'
- output_dir = Path('extracted_tables') # 原始表格输出目录(包含表格前文本)
- merged_output_dir = Path('merged_tables') # 合并后的表格输出目录(已剔除表格前文本)
- filtered_output_dir = Path('filtered_tables') # 筛选后的表格输出目录
- # CEB 转 PDF 配置
- AUTO_CONVERT_CEB = True # 是否自动尝试转换 CEB 文件
- CEB_CONVERTED_DIR = Path('converted_pdfs') # CEB 转换后的 PDF 存放目录
- # 表头规则配置:根据PDF文件名匹配对应的表头规则(固定匹配模式)
- # 每个规则可以包含多个表头定义,表头必须包含所有关键词才匹配
- TABLE_HEADER_RULES = {
- # 规则1: 结算报告类
- "9-(结算报告)山西晋城周村220kV输变电工程结算审计报告.pdf": [
- {
- "name": "审定结算汇总表",
- "keywords": ["序号", "审计内容", "送审金额(含税)", "审定金额(含税)", "审定金额(不含税)", "增减金额", "备注"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- {
- "name": "合同执行情况",
- "keywords": ["施工单位", "中标通知书金额", "中标通知书编号", "合同金额", "结算送审金额", "差额"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- {
- "name": "赔偿合同",
- "keywords": ["合同对方", "赔偿事项", "合同金额", "结算送审金额", "差额"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- {
- "name": "物资采购合同1",
- "keywords": ["物料名称", "合同数量", "施工图数量", "单价(不含税)", "差额"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- {
- "name": "物资采购合同2",
- "keywords": ["物料名称", "合同金额(不含税)", "入账金额", "差额", "备注"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- {
- "name": "其他服务类合同",
- "keywords": ["服务商", "中标通知书", "合同金额", "送审金额", "结算金额"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- }
- ],
- "4-(初设评审)中电联电力建设技术经济咨询中心技经〔2019〕201号关于山西周村220kV输变电工程初步设计的评审意见.pdf":[
- {
- "name": "初设评审的概算投资",
- "keywords": ["序号", "工程名称", "建设规模", "静态投资", "其中:建设场地征用及清理费", "动态投资"],
- "match_mode": "all" # 表头必须包含所有关键词才匹配(固定匹配)
- },
- ]
- # 可以添加更多文件的规则
- # "其他文件名.pdf": [
- # {Ctrl+Shift+P
- # "name": "表头名称",
- # "keywords": ["关键词1", "关键词2"],
- # "match_mode": "all" # "all" 表示必须包含所有关键词(固定匹配)
- # }
- # ]
- }
- # 是否启用表头过滤(如果为False,则提取所有表格)
- ENABLE_HEADER_FILTER = False
- # 要排除的规则名称列表(如果某个规则匹配了不该匹配的表格,可以在这里排除)
- # 例如: EXCLUDE_RULES = ["物资采购合同2"] 将不会匹配该规则
- EXCLUDE_RULES = []
- # 是否启用表格内多表头检测和分割(如果一个表格中包含多个表头,自动分割)
- # 已禁用此功能,改用基于文本的合并策略
- # ENABLE_SPLIT_MULTI_HEADER_TABLES = True
- # 是否提取表格前的文本(用于更好地识别表格类型)
- EXTRACT_TEXT_BEFORE_TABLE = True
- # 提取表格前多少行文本(用于识别表格标题和上下文)
- TEXT_LINES_BEFORE_TABLE = 5
- # 是否显示所有表格的表头预览(用于帮助确定过滤条件)
- SHOW_HEADER_PREVIEW = True
- # 是否显示匹配的规则名称
- SHOW_MATCHED_RULE_NAME = True
- # 是否启用跨页表格合并(基于表格前文本判断)
- ENABLE_MERGE_CROSS_PAGE_TABLES = True
- # 跨页表格检测的页面范围(检查前后几页)
- CROSS_PAGE_SEARCH_RANGE = 10 # 增加范围以支持长表格
- # 是否从已生成的xlsx文件中过滤(而不是从PDF提取)
- FILTER_FROM_EXISTING_XLSX = True # 设置为True
- XLSX_INPUT_DIR = 'extracted_tables' # 初始读取目录(会在流程中更新为 merged_tables)
- # ==================================================
- def check_table_header(table_df: pd.DataFrame, rule: dict) -> Tuple[bool, str]:
- """
- 检查表格是否匹配指定的表头规则(固定匹配:必须包含所有关键词)
- 支持处理表头换行的情况(合并前几行作为表头)
-
- Args:
- table_df: 表格DataFrame
- rule: 表头规则字典,包含 name, keywords, match_mode
-
- Returns:
- tuple: (是否匹配, 规则名称)
- """
- if table_df.empty:
- return False, ""
-
- rule_name = rule.get("name", "未知规则")
-
- # 检查是否在排除列表中
- if rule_name in EXCLUDE_RULES:
- return False, ""
-
- # 检查第一行是否是文本信息(以 "[表格前文本]" 开头)
- start_row = 0
- if len(table_df) > 0:
- first_cell = str(table_df.iloc[0, 0]).strip()
- if first_cell.startswith("[表格前文本]"):
- start_row = 1 # 跳过第一行
-
- # 处理表头换行:合并前几行作为完整表头(通常表头可能占1-3行)
- # 检查前3行,合并所有非空单元格内容
- header_rows_to_check = min(3, len(table_df) - start_row)
- header_text_parts = []
-
- for row_idx in range(start_row, start_row + header_rows_to_check):
- if row_idx >= len(table_df):
- break
- row = table_df.iloc[row_idx].astype(str).str.strip()
- # 收集该行的所有非空单元格内容
- for cell in row:
- cell_text = str(cell).strip()
- # 过滤掉空值、NaN等
- if cell_text and cell_text.lower() not in ['nan', 'none', '']:
- # 将单元格内的换行符替换为空格(处理xlsx中的换行)
- cell_text = cell_text.replace('\n', ' ').replace('\r', ' ')
- header_text_parts.append(cell_text)
-
- # 合并所有表头文本(去除换行符和多余空格)
- header_text = " ".join(header_text_parts)
- # 进一步清理:将多个连续空格替换为单个空格(包括换行符转换后的空格)
- header_text = re.sub(r'\s+', ' ', header_text).strip()
- # 创建一个无空格的版本用于匹配(处理换行导致的空格问题)
- header_text_no_space = re.sub(r'\s+', '', header_text)
-
- keywords = rule.get("keywords", [])
- match_mode = rule.get("match_mode", "all") # 默认使用 "all"(固定匹配)
-
- if not keywords:
- return False, ""
-
- # 固定匹配:必须包含所有关键词
- # 匹配时同时检查原文本和无空格版本,以处理换行导致的空格问题
- if match_mode == "all":
- all_match = True
- for keyword in keywords:
- # 同时检查原文本和无空格版本
- keyword_no_space = re.sub(r'\s+', '', keyword)
- if keyword in header_text or keyword_no_space in header_text_no_space:
- continue
- else:
- all_match = False
- break
- if all_match:
- return True, rule_name
- elif match_mode == "any":
- # 保留此选项,但不推荐使用(模糊匹配)
- for keyword in keywords:
- keyword_no_space = re.sub(r'\s+', '', keyword)
- if keyword in header_text or keyword_no_space in header_text_no_space:
- return True, rule_name
-
- return False, ""
- def has_table_header(table_df: pd.DataFrame, header_rules: List[dict]) -> Tuple[bool, str]:
- """
- 检查表格是否有表头(匹配任何一个规则)
-
- Args:
- table_df: 表格DataFrame
- header_rules: 表头规则列表
-
- Returns:
- tuple: (是否有表头, 匹配的规则名称)
- """
- for rule in header_rules:
- is_match, rule_name = check_table_header(table_df, rule)
- if is_match:
- return True, rule_name
- return False, ""
- def has_text_before_table(table_df: pd.DataFrame) -> Tuple[bool, str]:
- """
- 检查表格是否有前文本(判断是否是新表格的开始)
-
- Args:
- table_df: 表格DataFrame
-
- Returns:
- tuple: (是否有前文本, 前文本内容)
- """
- if table_df.empty or len(table_df) == 0:
- return False, ""
-
- first_cell = str(table_df.iloc[0, 0]).strip()
- if first_cell.startswith("[表格前文本]"):
- # 提取文本内容(去掉标记)
- text_content = first_cell.replace("[表格前文本]", "").strip()
- return True, text_content
-
- return False, ""
- def remove_text_row(table_df: pd.DataFrame) -> pd.DataFrame:
- """
- 移除表格第一行的文本信息(如果存在)
-
- Args:
- table_df: 表格DataFrame
-
- Returns:
- pd.DataFrame: 移除文本行后的表格
- """
- has_text, _ = has_text_before_table(table_df)
- if has_text:
- return table_df.iloc[1:].reset_index(drop=True)
- return table_df
- def convert_ceb_to_pdf(ceb_path: str) -> Optional[str]:
- """
- 尝试将 CEB 文件转换为 PDF
-
- CEB 是中国电子公文格式,需要专门的工具转换。
- 此函数会检查是否已有转换后的 PDF,如果没有则提示用户手动转换。
-
- Args:
- ceb_path: CEB 文件路径
-
- Returns:
- str: 转换后的 PDF 文件路径,如果转换失败返回 None
- """
- ceb_path = Path(ceb_path)
-
- if not ceb_path.exists():
- print(f"⚠ CEB 文件不存在: {ceb_path}")
- return None
-
- # 创建转换目录
- converted_dir = CEB_CONVERTED_DIR
- converted_dir.mkdir(parents=True, exist_ok=True)
-
- # 生成 PDF 文件名
- pdf_filename = ceb_path.stem + '.pdf'
- pdf_path = converted_dir / pdf_filename
-
- # 如果已经转换过,直接返回
- if pdf_path.exists():
- print(f"✓ 发现已转换的 PDF: {pdf_path}")
- return str(pdf_path)
-
- print(f"\n检测到 CEB 文件: {ceb_path.name}")
- print("=" * 60)
- print("CEB 是中国电子公文格式,需要手动转换为 PDF")
- print("=" * 60)
- print("\n推荐转换方法:")
- print("1. 使用方正 Apabi Reader (官方工具)")
- print(" - 下载: http://www.apabi.com/")
- print(" - 打开 CEB 文件后,选择 '文件' -> '另存为' -> 'PDF'")
- print("\n2. 使用在线转换工具")
- print(" - https://convertio.co/zh/ceb-pdf/")
- print(" - https://www.aconvert.com/cn/document/ceb-to-pdf/")
- print("\n3. 使用 CEB 阅读器导出")
- print(" - 安装任意支持 CEB 格式的阅读器")
- print(" - 打开文件后导出为 PDF")
- print("\n" + "=" * 60)
- print(f"请将转换后的 PDF 文件放到: {converted_dir.absolute()}")
- print(f"文件名: {pdf_filename}")
- print("=" * 60)
-
- return None
- def check_and_convert_file(file_path: str) -> str:
- """
- 检查文件类型,如果是 CEB 则尝试转换为 PDF
-
- Args:
- file_path: 文件路径
-
- Returns:
- str: PDF 文件路径
- """
- file_path = Path(file_path)
-
- # 检查文件扩展名
- if file_path.suffix.lower() == '.ceb':
- if AUTO_CONVERT_CEB:
- pdf_path = convert_ceb_to_pdf(str(file_path))
- if pdf_path:
- return pdf_path
- else:
- print("\n请手动转换 CEB 文件后重新运行脚本")
- exit(1)
- else:
- print(f"⚠ 检测到 CEB 文件,但自动转换已禁用: {file_path}")
- print(" 请设置 AUTO_CONVERT_CEB = True 或手动转换为 PDF")
- exit(1)
-
- return str(file_path)
- def clean_newlines(table_df: pd.DataFrame) -> pd.DataFrame:
- """
- 清理单元格内的换行符
-
- Args:
- table_df: 表格DataFrame
-
- Returns:
- pd.DataFrame: 清理后的表格
- """
- if table_df.empty:
- return table_df
-
- df = table_df.copy()
-
- # 先转换为 object 类型避免 dtype 警告
- df = df.astype(object)
-
- # 遍历所有单元格,移除换行符
- for i in range(len(df)):
- for j in range(len(df.columns)):
- cell_val = df.iloc[i, j]
- if cell_val is not None and not pd.isna(cell_val):
- # 移除换行符
- df.iloc[i, j] = str(cell_val).replace('\n', '').replace('\r', '')
-
- return df
- def merge_broken_rows(table_df: pd.DataFrame, header_rows: int = 1, debug: bool = False) -> pd.DataFrame:
- """
- 合并被截断的行(跨页时一行被分成多行)
-
- 检测规则:如果某一行前N列有内容,但后面的列大部分是空的(超过50%),
- 则认为是上一行的延续,合并到上一行对应的列
-
- Args:
- table_df: 表格DataFrame
- header_rows: 表头行数,跳过表头不处理
- debug: 是否输出调试信息
-
- Returns:
- pd.DataFrame: 合并后的表格
- """
- if table_df.empty or len(table_df) <= header_rows:
- return table_df
-
- df = table_df.copy()
- rows_to_remove = []
-
- if debug:
- print(f" [DEBUG] merge_broken_rows: 检查 {len(df)} 行,跳过前 {header_rows} 行表头")
-
- # 从表头后开始检查
- for i in range(header_rows, len(df)):
- # 统计有内容的列和空列
- non_empty_cols = []
- empty_cols = []
-
- for j in range(len(df.columns)):
- val = df.iloc[i, j]
- val_str = str(val).strip()
- is_empty = val is None or pd.isna(val) or not val_str or val_str.lower() in ['nan', 'none', '']
-
- if is_empty:
- empty_cols.append(j)
- else:
- non_empty_cols.append(j)
-
- # 如果没有非空列,跳过
- if not non_empty_cols:
- continue
-
- # 计算空列比例
- empty_ratio = len(empty_cols) / len(df.columns)
-
- if debug:
- print(f" [DEBUG] 行 {i}: 非空列={non_empty_cols}, 空列比例={empty_ratio:.2f}")
-
- # 如果空列超过50%,且不是第一行数据,认为是被截断的行
- if empty_ratio > 0.5 and i > header_rows:
- # 检查上一行对应位置是否有内容
- can_merge = True
- for col_idx in non_empty_cols:
- prev_val = df.iloc[i-1, col_idx]
- if prev_val is None or pd.isna(prev_val) or str(prev_val).strip() == '':
- can_merge = False
- break
-
- if can_merge:
- if debug:
- print(f" [DEBUG] ✓ 合并行 {i} 到行 {i-1}")
-
- # 合并每个非空列到上一行对应的列
- for col_idx in non_empty_cols:
- prev_val = df.iloc[i-1, col_idx]
- curr_val = df.iloc[i, col_idx]
- df.iloc[i-1, col_idx] = str(prev_val) + str(curr_val)
- if debug:
- print(f" [DEBUG] 列 {col_idx}: '{prev_val}' + '{curr_val}'")
-
- rows_to_remove.append(i)
- else:
- if debug:
- print(f" [DEBUG] ✗ 上一行对应列为空,跳过合并")
- else:
- if debug and empty_ratio > 0:
- print(f" [DEBUG] ✗ 空列比例不足50%或是第一行数据,跳过")
-
- # 删除已合并的行
- if rows_to_remove:
- df = df.drop(rows_to_remove).reset_index(drop=True)
- if debug:
- print(f" ✓ 合并了 {len(rows_to_remove)} 个被截断的行")
- else:
- if debug:
- print(f" ℹ 没有发现需要合并的被截断行")
-
- return df
- def fix_broken_cells(table_df: pd.DataFrame, header_row_count: int = 1) -> pd.DataFrame:
- """
- 修复被错误分割的单元格(一个单元格的内容被识别成多行)
-
- 检测规则:
- 1. 如果某一行的第一列有内容,但其他列全部为空
- 2. 则认为当前行是上一行第一列内容的延续,需要合并
-
- Args:
- table_df: 表格DataFrame
- header_row_count: 表头行数,跳过表头不处理
-
- Returns:
- pd.DataFrame: 修复后的表格
- """
- if table_df.empty or len(table_df) <= header_row_count:
- return table_df
-
- df = table_df.copy()
- rows_to_remove = []
-
- # 从表头后开始检查
- for i in range(header_row_count, len(df)):
- # 获取当前行
- current_row = df.iloc[i]
-
- # 检查第一列是否有内容
- first_col = str(current_row.iloc[0]).strip()
- if not first_col or first_col.lower() in ['nan', 'none', '']:
- continue
-
- # 检查其他列是否全部为空
- other_cols = current_row.iloc[1:]
- all_empty = all(str(val).strip() in ['', 'nan', 'None', 'NaN'] for val in other_cols)
-
- if all_empty and i > header_row_count:
- # 其他列全部为空,合并到上一行的第一列
- prev_first_col = str(df.iloc[i-1, 0]).strip()
- if prev_first_col and prev_first_col.lower() not in ['nan', 'none', '']:
- # 合并到上一行的第一列(移除换行符)
- df.iloc[i-1, 0] = prev_first_col + first_col.replace('\n', '').replace('\r', '')
- rows_to_remove.append(i)
-
- # 删除已合并的行
- if rows_to_remove:
- df = df.drop(rows_to_remove).reset_index(drop=True)
- print(f" 修复了 {len(rows_to_remove)} 个被错误分割的单元格(跨行)")
-
- return df
- def fix_split_cells_across_columns(table_df: pd.DataFrame, header_row_count: int = 1) -> pd.DataFrame:
- """
- 修复被错误分割到多列的单元格
-
- 例如:"3" 和 "工程物资" 被分成两列,应该合并为 "3 工程物资"
-
- 检测规则:
- 1. 如果某一行的前两列都有内容,但第一列内容很短(1-2个字符)
- 2. 且第二列看起来像是第一列的延续(都是文字)
- 3. 则合并这两列
-
- Args:
- table_df: 表格DataFrame
- header_row_count: 表头行数,跳过表头不处理
-
- Returns:
- pd.DataFrame: 修复后的表格
- """
- if table_df.empty or len(table_df) <= header_row_count or len(table_df.columns) < 2:
- return table_df
-
- df = table_df.copy()
- merge_count = 0
-
- # 从表头后开始检查
- for i in range(header_row_count, len(df)):
- first_col = str(df.iloc[i, 0]).strip()
- second_col = str(df.iloc[i, 1]).strip()
-
- # 检查第一列和第二列是否都有内容
- if (first_col and first_col.lower() not in ['nan', 'none', ''] and
- second_col and second_col.lower() not in ['nan', 'none', '']):
-
- # 检查第一列是否很短(1-3个字符,通常是序号或简短文字)
- # 且第三列及之后的列是否有数据(说明这不是正常的多列数据)
- if len(first_col) <= 3:
- # 检查第三列及之后是否有数据
- has_data_after = False
- if len(df.columns) > 2:
- for col_idx in range(2, len(df.columns)):
- val = str(df.iloc[i, col_idx]).strip()
- if val and val.lower() not in ['nan', 'none', '']:
- has_data_after = True
- break
-
- # 如果第三列及之后有数据,说明这可能是被错误分割的
- # 或者如果第一列是纯数字(序号),也可能需要合并
- if has_data_after or first_col.isdigit():
- # 合并第一列和第二列
- df.iloc[i, 1] = first_col + ' ' + second_col
- df.iloc[i, 0] = ''
- merge_count += 1
-
- if merge_count > 0:
- print(f" 修复了 {merge_count} 个被错误分割的单元格(跨列)")
-
- return df
- def remove_empty_columns(table_df: pd.DataFrame, header_row_count: int = 1, empty_threshold: float = 0.8) -> pd.DataFrame:
- """
- 移除空列(整列都是空值或大部分为空)
-
- Args:
- table_df: 表格DataFrame
- header_row_count: 表头行数
- empty_threshold: 空值比例阈值(超过此比例认为是空列)
-
- Returns:
- pd.DataFrame: 移除空列后的表格
- """
- if table_df.empty:
- return table_df
-
- df = table_df.copy()
- cols_to_remove = []
-
- # 检查每一列
- for col_idx in range(len(df.columns)):
- # 获取该列的所有值(包括表头)
- col_values = df.iloc[:, col_idx]
-
- # 计算空值比例(检查整列)
- empty_count = sum(1 for val in col_values if str(val).strip() in ['', 'nan', 'None', 'NaN'])
- empty_ratio = empty_count / len(col_values) if len(col_values) > 0 else 1.0
-
- # 如果空值比例超过阈值,标记为待删除
- if empty_ratio >= empty_threshold:
- cols_to_remove.append(col_idx)
-
- # 删除空列
- if cols_to_remove:
- df = df.drop(df.columns[cols_to_remove], axis=1)
- df.columns = range(len(df.columns)) # 重置列索引
- print(f" 移除了 {len(cols_to_remove)} 个空列")
-
- return df
- def clean_cell_text(table_df: pd.DataFrame) -> pd.DataFrame:
- """
- 清理单元格内的文本(移除换行符等)
-
- Args:
- table_df: 表格DataFrame
-
- Returns:
- pd.DataFrame: 清理后的表格
- """
- if table_df.empty:
- return table_df
-
- df = table_df.copy()
-
- # 先将整个 DataFrame 转换为 object 类型(字符串类型),避免 dtype 警告
- df = df.astype(object)
-
- # 遍历所有单元格,移除换行符
- for i in range(len(df)):
- for j in range(len(df.columns)):
- cell_val = str(df.iloc[i, j])
- if cell_val and cell_val.lower() not in ['nan', 'none', '']:
- # 移除换行符,用空字符串替换
- cleaned_val = cell_val.replace('\n', '').replace('\r', '')
- df.iloc[i, j] = cleaned_val
-
- return df
- def fix_merged_header_cells(table_df: pd.DataFrame, expected_keywords: list = None) -> pd.DataFrame:
- """
- 修复表头中被错误合并的单元格
-
- 例如:"施工图数量 单价(不含税)" 应该分成两列:"施工图数量" 和 "单价(不含税)"
-
- 注意:这个函数会增加列数,但不会自动调整数据行。
- 建议在调用此函数前先移除空列。
-
- Args:
- table_df: 表格DataFrame
- expected_keywords: 期望的关键词列表,用于检测是否需要分割
-
- Returns:
- pd.DataFrame: 修复后的表格
- """
- if table_df.empty or len(table_df) == 0:
- return table_df
-
- df = table_df.copy()
-
- # 检查第一行(表头)是否有需要分割的单元格
- header_row = df.iloc[0]
- cols_to_split = []
-
- for col_idx in range(len(header_row)):
- cell_val = str(header_row.iloc[col_idx]).strip()
-
- # 检查是否包含多个关键词(用空格分隔)
- # 常见的分割模式:包含多个中文词组,且中间有空格
- if ' ' in cell_val and cell_val not in ['nan', 'None', 'NaN', '']:
- # 尝试分割
- parts = cell_val.split(' ')
- # 如果分割后有多个非空部分,且每个部分都有实际内容
- valid_parts = [p.strip() for p in parts if p.strip() and len(p.strip()) > 1]
- if len(valid_parts) >= 2:
- cols_to_split.append((col_idx, valid_parts))
-
- # 如果没有需要分割的列,直接返回
- if not cols_to_split:
- return df
-
- # 从后往前处理,避免索引变化
- for col_idx, parts in reversed(cols_to_split):
- print(f" 分割表头列 {col_idx}: '{header_row.iloc[col_idx]}' -> {parts}")
-
- # 获取原列的数据
- original_col = df.iloc[:, col_idx].tolist()
-
- # 删除原列
- df = df.drop(df.columns[col_idx], axis=1)
-
- # 创建新列并插入
- for part_idx, part in enumerate(parts):
- new_col_data = []
- for row_idx in range(len(original_col)):
- if row_idx == 0:
- # 表头行:使用分割后的部分
- new_col_data.append(part)
- else:
- # 数据行:第一个新列保留原数据,其他新列为空
- if part_idx == 0:
- new_col_data.append(original_col[row_idx])
- else:
- new_col_data.append('')
-
- # 使用唯一的临时列名插入
- temp_col_name = f'temp_col_{col_idx}_{part_idx}'
- df.insert(col_idx + part_idx, temp_col_name, new_col_data)
-
- # 重置列索引
- df.columns = range(len(df.columns))
-
- return df
- def extract_text_before_table(pdf_path: str, page_num: int, table_bbox: tuple, num_lines: int = 5) -> str:
- """
- 提取表格前的文本
-
- Args:
- pdf_path: PDF文件路径
- page_num: 页码(从1开始)
- table_bbox: 表格边界框 (x0, top, x1, bottom),pdfplumber格式
- num_lines: 提取多少行文本
-
- Returns:
- str: 表格前的文本
- """
- if not PYMUPDF_AVAILABLE or not EXTRACT_TEXT_BEFORE_TABLE:
- return ""
-
- try:
- doc = fitz.open(pdf_path)
- page = doc[page_num - 1] # PyMuPDF 页码从0开始
-
- # pdfplumber 的 bbox 格式: (x0, top, x1, bottom),坐标系原点在左上角
- # PyMuPDF 的坐标系也是原点在左上角,可以直接使用
- table_top = table_bbox[1] # top
-
- # 获取页面所有文本块
- blocks = page.get_text("dict")["blocks"]
-
- # 筛选表格上方的文本块
- text_blocks_above = []
- for block in blocks:
- if block["type"] == 0: # 文本块
- block_bottom = block["bbox"][3] # block 的底部
-
- # 如果文本块在表格上方
- if block_bottom < table_top:
- # 提取文本
- block_text = ""
- for line in block["lines"]:
- line_text = ""
- for span in line["spans"]:
- line_text += span["text"]
- block_text += line_text.strip() + "\n"
-
- text_blocks_above.append({
- "y": block["bbox"][1], # 使用 top 坐标排序
- "text": block_text.strip()
- })
-
- # 按 y 坐标排序(从上到下)
- text_blocks_above.sort(key=lambda x: x["y"])
-
- # 取最后 num_lines 行(最接近表格的文本)
- recent_texts = [block["text"] for block in text_blocks_above[-num_lines:]]
-
- doc.close()
-
- return "\n".join(recent_texts)
-
- except Exception as e:
- print(f"⚠ 提取表格前文本失败: {e}")
- return ""
- def extract_tables_with_pdfplumber(pdf_path: str, pages: str = 'all') -> List[Tuple[int, pd.DataFrame, tuple]]:
- """
- 使用 pdfplumber 提取 PDF 中的表格
-
- Args:
- pdf_path: PDF文件路径
- pages: 页面范围,'all' 表示所有页面
-
- Returns:
- List[Tuple[int, pd.DataFrame, tuple]]: [(页码, DataFrame, bbox), ...]
- """
- if not PDFPLUMBER_AVAILABLE:
- raise ImportError("pdfplumber 未安装")
-
- tables_data = []
-
- with pdfplumber.open(pdf_path) as pdf:
- # 确定要处理的页面
- if pages == 'all':
- pages_to_process = pdf.pages
- else:
- # 解析页面范围(如 "1-5,7,9-10")
- page_numbers = []
- for part in pages.split(','):
- if '-' in part:
- start, end = map(int, part.split('-'))
- page_numbers.extend(range(start, end + 1))
- else:
- page_numbers.append(int(part))
- pages_to_process = [pdf.pages[i-1] for i in page_numbers if 0 < i <= len(pdf.pages)]
-
- # 提取每一页的表格
- for page in pages_to_process:
- page_num = page.page_number
-
- # 使用 pdfplumber 的表格识别
- # table_settings 可以调整识别精度
- table_settings = {
- "vertical_strategy": "lines", # 使用线条识别垂直边界
- "horizontal_strategy": "lines", # 使用线条识别水平边界
- "intersection_tolerance": 3, # 交叉点容差
- "min_words_vertical": 1, # 最少垂直单词数
- "min_words_horizontal": 1, # 最少水平单词数
- }
-
- tables = page.find_tables(table_settings=table_settings)
-
- for table in tables:
- # 提取表格数据
- table_data = table.extract()
-
- if table_data and len(table_data) > 0:
- # 转换为 DataFrame
- df = pd.DataFrame(table_data)
-
- # 获取表格边界框
- bbox = table.bbox # (x0, top, x1, bottom)
-
- tables_data.append((page_num, df, bbox))
-
- return tables_data
- def is_likely_header_only(table_df: pd.DataFrame, min_data_rows: int = 2) -> bool:
- """
- 判断表格是否只包含表头(没有数据行或数据行很少)
-
- Args:
- table_df: 表格DataFrame
- min_data_rows: 最少数据行数,少于这个数认为是只有表头
-
- Returns:
- bool: 是否只包含表头
- """
- if table_df.empty:
- return True
-
- # 如果总行数少于等于表头行数(假设表头占1-3行)+ 最小数据行数
- header_rows = min(3, len(table_df))
- return len(table_df) <= header_rows + min_data_rows
- def has_similar_structure(table1_df: pd.DataFrame, table2_df: pd.DataFrame,
- tolerance: int = 1) -> bool:
- """
- 判断两个表格是否有相似的结构(列数相近)
-
- Args:
- table1_df: 第一个表格DataFrame
- table2_df: 第二个表格DataFrame
- tolerance: 允许的列数差异
-
- Returns:
- bool: 是否有相似结构
- """
- if table1_df.empty or table2_df.empty:
- return False
-
- cols1 = len(table1_df.columns)
- cols2 = len(table2_df.columns)
- return abs(cols1 - cols2) <= tolerance
- def merge_cross_page_tables(matched_tables: List[Tuple[int, object]],
- tables: object, header_rules: List[dict]) -> List[Tuple[int, pd.DataFrame, str]]:
- """
- 合并跨页表格
-
- 处理两种情况:
- 1. 表头在上一页,内容在下一页
- 2. 表格在上一页未显示完,下一页继续(包括未匹配到规则的表格)
-
- Args:
- matched_tables: 已匹配的表格列表 [(原始索引, table对象), ...]
- tables: 所有表格对象
- header_rules: 表头规则列表
-
- Returns:
- List[Tuple[int, pd.DataFrame, str]]: 合并后的表格列表 [(原始索引, 合并后的DataFrame, 规则名称), ...]
- """
- if not ENABLE_MERGE_CROSS_PAGE_TABLES:
- # 不启用跨页合并,直接返回原表格
- result = []
- for orig_idx, table in matched_tables:
- result.append((orig_idx, table.df, ""))
- return result
-
- print("\n正在检测和合并跨页表格...")
-
- # 构建页面索引映射:page -> [(table_index, table, rule_name), ...]
- # 同时记录所有表格(包括未匹配的),用于跨页合并
- page_to_tables = {}
- page_to_all_tables = {} # 所有表格(包括未匹配的)
- table_to_rule = {}
- matched_indices = set() # 已匹配的表格索引
-
- # 先记录所有表格
- for i, table in enumerate(tables):
- page = table.page
- if page not in page_to_all_tables:
- page_to_all_tables[page] = []
- page_to_all_tables[page].append((i, table))
-
- # 记录已匹配的表格
- for orig_idx, table in matched_tables:
- page = table.page
- matched_indices.add(orig_idx)
- if page not in page_to_tables:
- page_to_tables[page] = []
-
- # 找到匹配的规则名称
- rule_name = ""
- for rule in header_rules:
- is_match, name = check_table_header(table.df, rule)
- if is_match:
- rule_name = name
- break
-
- page_to_tables[page].append((orig_idx, table, rule_name))
- table_to_rule[orig_idx] = rule_name
-
- merged_results = []
- processed_indices = set()
-
- # 按页面顺序处理
- sorted_pages = sorted(page_to_tables.keys())
-
- for page_idx, current_page in enumerate(sorted_pages):
- for orig_idx, table, rule_name in page_to_tables[current_page]:
- if orig_idx in processed_indices:
- continue
-
- # 检查是否是只有表头的表格(可能在上一页结尾)
- is_header_only = is_likely_header_only(table.df)
- current_df = table.df.copy()
-
- # 情况1: 当前表格只有表头,检查下一页是否有内容
- # 限制:最多只能跨1页(只检查下一页)
- if is_header_only:
- # 只检查下一页(页面号相差1),不检查更多页
- if page_idx + 1 < len(sorted_pages):
- next_page = sorted_pages[page_idx + 1]
-
- # 严格检查:页面号必须相差1
- if abs(next_page - current_page) == 1:
- # 先检查已匹配的表格
- if next_page in page_to_tables:
- for next_orig_idx, next_table, next_rule_name in page_to_tables[next_page]:
- if next_orig_idx in processed_indices:
- continue
-
- # 检查是否有相似结构且规则名称匹配
- if (has_similar_structure(current_df, next_table.df) and
- rule_name == next_rule_name):
-
- # 合并:保留当前表格的表头,添加下一页的数据
- # 假设表头占前3行
- header_rows = min(3, len(current_df))
- header_df = current_df.iloc[:header_rows].copy()
-
- # 下一页的数据(跳过可能的重复表头)
- next_data_df = next_table.df.copy()
- # 如果下一页第一行看起来像表头,跳过
- if len(next_data_df) > 0:
- first_row_text = " ".join(next_data_df.iloc[0].astype(str).str.strip().tolist())
- # 简单判断:如果第一行包含很多关键词,可能是表头
- keyword_count = sum(1 for kw in header_rules[0].get('keywords', []) if kw in first_row_text) if header_rules else 0
- if keyword_count >= 2:
- next_data_df = next_data_df.iloc[1:].copy()
-
- # 合并
- merged_df = pd.concat([header_df, next_data_df], ignore_index=True)
- merged_results.append((orig_idx, merged_df, rule_name))
- processed_indices.add(orig_idx)
- processed_indices.add(next_orig_idx)
- print(f" ✓ 合并跨页表格: 页面 {current_page} (表头) + 页面 {next_page} (内容) -> 规则: {rule_name}")
- break
-
- if orig_idx in processed_indices:
- break
-
- # 如果已匹配的表格没有找到,检查所有表格(包括未匹配的)
- if orig_idx not in processed_indices and next_page in page_to_all_tables:
- for next_orig_idx, next_table in page_to_all_tables[next_page]:
- if next_orig_idx in processed_indices:
- continue
-
- # 检查是否有相似结构(列数相同或相近)
- # 同时检查下一页表格是否匹配相同的规则(防止不同规则的表格被合并)
- next_table_rule_name = ""
- if header_rules:
- for rule in header_rules:
- is_match, name = check_table_header(next_table.df, rule)
- if is_match:
- next_table_rule_name = name
- break
-
- # 只有规则名称匹配且结构相似时,才考虑合并
- if (has_similar_structure(current_df, next_table.df) and
- (rule_name == next_table_rule_name or next_table_rule_name == "")):
- # 检查下一页第一行是否像表头
- next_first_row_text = ""
- if not next_table.df.empty:
- next_first_row_text = " ".join(next_table.df.iloc[0].astype(str).str.strip().tolist())
-
- # 检查是否包含表头关键词
- keyword_count = 0
- if header_rules and rule_name:
- for rule in header_rules:
- if rule.get('name') == rule_name:
- keywords = rule.get('keywords', [])
- keyword_count = sum(1 for kw in keywords if kw in next_first_row_text)
- break
-
- # 如果下一页第一行不像表头(关键词少于2个),认为是表格的继续
- # 但如果下一页匹配了不同的规则,则不合并
- if keyword_count < 2 and (rule_name == next_table_rule_name or next_table_rule_name == ""):
- # 合并:保留当前表格的表头,添加下一页的数据
- header_rows = min(3, len(current_df))
- header_df = current_df.iloc[:header_rows].copy()
- next_data_df = next_table.df.copy()
-
- # 如果列数不同,尝试对齐列
- if len(header_df.columns) != len(next_data_df.columns):
- # 如果下一页列数少,添加空列
- if len(next_data_df.columns) < len(header_df.columns):
- for i in range(len(next_data_df.columns), len(header_df.columns)):
- next_data_df[len(next_data_df.columns)] = ""
-
- merged_df = pd.concat([header_df, next_data_df], ignore_index=True)
- merged_results.append((orig_idx, merged_df, rule_name))
- processed_indices.add(orig_idx)
- processed_indices.add(next_orig_idx)
- print(f" ✓ 合并跨页表格: 页面 {current_page} (表头) + 页面 {next_page} (内容,未匹配) -> 规则: {rule_name}")
- break
-
- if orig_idx in processed_indices:
- break
-
- # 情况2: 当前表格有内容,检查上一页是否有表头
- # 限制:最多只能跨1页(只检查上一页)
- if orig_idx not in processed_indices and not is_header_only:
- # 只检查上一页(页面号相差1),不检查更多页
- if page_idx > 0:
- prev_page = sorted_pages[page_idx - 1]
-
- # 严格检查:页面号必须相差1
- if abs(prev_page - current_page) == 1:
-
- if prev_page in page_to_tables:
- for prev_orig_idx, prev_table, prev_rule_name in page_to_tables[prev_page]:
- if prev_orig_idx in processed_indices:
- continue
-
- # 检查上一页是否只有表头,且结构相似
- if (is_likely_header_only(prev_table.df) and
- has_similar_structure(prev_table.df, current_df) and
- rule_name == prev_rule_name):
-
- # 合并:使用上一页的表头 + 当前页的数据
- header_rows = min(3, len(prev_table.df))
- header_df = prev_table.df.iloc[:header_rows].copy()
-
- # 当前页的数据(跳过可能的重复表头)
- current_data_df = current_df.copy()
- if len(current_data_df) > 0:
- first_row_text = " ".join(current_data_df.iloc[0].astype(str).str.strip().tolist())
- keyword_count = sum(1 for kw in header_rules[0].get('keywords', []) if kw in first_row_text) if header_rules else 0
- if keyword_count >= 2:
- current_data_df = current_data_df.iloc[1:].copy()
-
- merged_df = pd.concat([header_df, current_data_df], ignore_index=True)
- merged_results.append((orig_idx, merged_df, rule_name))
- processed_indices.add(orig_idx)
- processed_indices.add(prev_orig_idx)
- print(f" ✓ 合并跨页表格: 页面 {prev_page} (表头) + 页面 {current_page} (内容) -> 规则: {rule_name}")
- break
-
- if orig_idx in processed_indices:
- break
-
- # 情况3: 表格跨页继续(上一页未显示完,下一页继续)
- # 检查相邻页面的所有表格(包括未匹配到规则的),如果列数相同且第一行不像表头,则合并
- # 限制:最多只能跨1页(只检查下一页,不检查更多页)
- if orig_idx not in processed_indices:
- # 只检查下一页(页面号相差1),不检查更多页
- if page_idx + 1 < len(sorted_pages):
- next_page = sorted_pages[page_idx + 1]
-
- # 严格检查:页面号必须相差1
- if abs(next_page - current_page) == 1:
- # 检查下一页的所有表格(包括未匹配的)
- if next_page in page_to_all_tables:
-
- for next_orig_idx, next_table in page_to_all_tables[next_page]:
- if next_orig_idx in processed_indices:
- continue
-
- # 检查结构是否相似(列数相同或相近)
- # 同时检查下一页表格是否匹配相同的规则(防止不同规则的表格被合并)
- next_table_rule_name = ""
- if header_rules:
- for rule in header_rules:
- is_match, name = check_table_header(next_table.df, rule)
- if is_match:
- next_table_rule_name = name
- break
-
- # 只有规则名称匹配且结构相似时,才考虑合并
- if (has_similar_structure(current_df, next_table.df) and
- (rule_name == next_table_rule_name or next_table_rule_name == "")):
- # 检查下一页第一行是否像表头
- next_first_row_text = ""
- if not next_table.df.empty:
- next_first_row_text = " ".join(next_table.df.iloc[0].astype(str).str.strip().tolist())
-
- # 检查是否包含表头关键词
- keyword_count = 0
- if header_rules and rule_name:
- for rule in header_rules:
- if rule.get('name') == rule_name:
- keywords = rule.get('keywords', [])
- keyword_count = sum(1 for kw in keywords if kw in next_first_row_text)
- break
-
- # 检查合并后的表格是否包含新的表头(防止跨多页合并)
- # 检查下一页的数据中是否包含其他规则的表头
- has_other_header = False
- if len(next_table.df) > 0:
- # 检查前几行是否包含其他规则的关键词
- for check_row_idx in range(min(3, len(next_table.df))):
- check_row_text = " ".join(next_table.df.iloc[check_row_idx].astype(str).str.strip().tolist())
- # 检查是否包含其他规则的关键词
- for other_rule in header_rules:
- if other_rule.get('name') != rule_name:
- other_keywords = other_rule.get('keywords', [])
- other_keyword_count = sum(1 for kw in other_keywords if kw in check_row_text)
- if other_keyword_count >= 2:
- has_other_header = True
- break
- if has_other_header:
- break
-
- # 如果下一页第一行不像表头(关键词少于2个),且不包含其他表头,且规则匹配,认为是表格的继续
- # 注意:如果下一页第一行是表头(keyword_count >= 2),或匹配了不同的规则,则不合并
- if (keyword_count < 2 and not has_other_header and
- (rule_name == next_table_rule_name or next_table_rule_name == "")):
- # 合并:当前表格 + 下一页的数据
- next_data_df = next_table.df.copy()
- merged_df = pd.concat([current_df, next_data_df], ignore_index=True)
- merged_results.append((orig_idx, merged_df, rule_name))
- processed_indices.add(orig_idx)
- processed_indices.add(next_orig_idx)
- next_rule_display = rule_name if next_orig_idx in matched_indices else "未匹配"
- print(f" ✓ 合并跨页表格: 页面 {current_page} + 页面 {next_page} (继续) -> 规则: {rule_name} (下一页表格: {next_rule_display})")
- break
-
- if orig_idx in processed_indices:
- break
-
- # 如果没有被合并,单独添加
- if orig_idx not in processed_indices:
- merged_results.append((orig_idx, current_df, rule_name))
- processed_indices.add(orig_idx)
-
- print(f"✓ 跨页表格合并完成,共 {len(merged_results)} 个表格(含合并后的)")
- return merged_results
- def load_tables_from_xlsx(xlsx_dir: Path) -> List[Tuple[int, pd.DataFrame, int]]:
- """
- 从xlsx文件中加载表格,按页面和表格顺序排序
-
- Args:
- xlsx_dir: xlsx文件所在目录
-
- Returns:
- List[Tuple[int, pd.DataFrame, int]]: [(索引, DataFrame, 页面号), ...]
- """
- xlsx_files = list(xlsx_dir.glob("*.xlsx"))
- if not xlsx_files:
- print(f"⚠ 在目录 {xlsx_dir} 中未找到xlsx文件")
- return []
-
- # 解析文件名并排序:先按页码,再按表格序号
- file_info = []
- for xlsx_file in xlsx_files:
- # 支持两种格式:
- # 1. 新格式:table_page8_1of3.xlsx -> page=8, table_num=1, total=3
- # 2. 旧格式:table_page8_3.xlsx -> page=8, table_num=3
- match_new = re.search(r'page(\d+)_(\d+)of(\d+)', xlsx_file.stem)
- match_old = re.search(r'page(\d+)_(\d+)', xlsx_file.stem)
-
- if match_new:
- page_num = int(match_new.group(1))
- table_num = int(match_new.group(2))
- file_info.append((xlsx_file, page_num, table_num))
- elif match_old:
- page_num = int(match_old.group(1))
- table_num = int(match_old.group(2))
- file_info.append((xlsx_file, page_num, table_num))
- else:
- # 如果无法解析,尝试只提取页码
- page_match = re.search(r'page(\d+)', xlsx_file.stem)
- if page_match:
- page_num = int(page_match.group(1))
- file_info.append((xlsx_file, page_num, 0))
- else:
- # 完全无法解析,使用默认值
- file_info.append((xlsx_file, 9999, 0))
-
- # 按页码和表格序号排序
- file_info.sort(key=lambda x: (x[1], x[2]))
-
- tables = []
- for idx, (xlsx_file, page_num, table_num) in enumerate(file_info):
- try:
- df = pd.read_excel(xlsx_file, header=None)
- tables.append((idx, df, page_num))
- except Exception as e:
- print(f"⚠ 读取文件 {xlsx_file.name} 失败: {e}")
-
- return tables
- def get_header_rules_for_file(pdf_path: str) -> List[dict]:
- """
- 根据PDF文件名获取对应的表头规则(精确匹配文件名)
-
- Args:
- pdf_path: PDF文件路径
-
- Returns:
- List[dict]: 表头规则列表
- """
- pdf_filename = Path(pdf_path).name
-
- # 精确匹配文件名(固定匹配,不进行模糊匹配)
- if pdf_filename in TABLE_HEADER_RULES:
- return TABLE_HEADER_RULES[pdf_filename]
-
- return []
- def preview_table_headers(tables, max_preview: int = 10):
- """预览表格表头,帮助用户确定过滤条件(支持多行表头)"""
- print("\n" + "=" * 60)
- print("表格表头预览(前几个表格,包含多行表头):")
- print("=" * 60)
-
- preview_count = min(max_preview, tables.n)
- for i in range(preview_count):
- table = tables[i]
- if table.df.empty:
- print(f"表格 {i+1} (页面 {table.page}): [空表格]")
- continue
-
- # 合并前3行作为表头预览(处理换行情况)
- header_rows_to_check = min(3, len(table.df))
- header_parts = []
- for row_idx in range(header_rows_to_check):
- row = table.df.iloc[row_idx].astype(str).str.strip()
- for cell in row[:5]: # 只显示前5列
- cell_text = str(cell).strip()
- if cell_text and cell_text.lower() not in ['nan', 'none', '']:
- header_parts.append(cell_text)
-
- header_text = " | ".join(header_parts[:10]) # 最多显示10个部分
- if len(header_parts) > 10:
- header_text += " ..."
- print(f"表格 {i+1} (页面 {table.page}): {header_text}")
-
- if tables.n > max_preview:
- print(f"... 还有 {tables.n - max_preview} 个表格未显示")
- print("=" * 60 + "\n")
- # 检查并转换文件(如果是 CEB)
- pdf_path = check_and_convert_file(pdf_path)
- print(f"处理文件: {pdf_path}\n")
- # 创建输出目录
- output_dir.mkdir(exist_ok=True)
- # 判断是从PDF提取还是从xlsx文件过滤
- if FILTER_FROM_EXISTING_XLSX and XLSX_INPUT_DIR:
- xlsx_input_path = Path(XLSX_INPUT_DIR)
-
- # 检查xlsx文件是否存在
- xlsx_files = list(xlsx_input_path.glob("*.xlsx")) if xlsx_input_path.exists() else []
-
- if not xlsx_files:
- print("=" * 60)
- print("未找到xlsx文件,先从PDF生成xlsx文件...")
- print("=" * 60)
-
- # 先创建目录
- xlsx_input_path.mkdir(parents=True, exist_ok=True)
-
- # 从PDF提取所有表格
- print("\n正在读取PDF文件并提取表格,请稍候...")
- start_time = time.time()
-
- # 使用 pdfplumber 提取表格
- tables_data = extract_tables_with_pdfplumber(pdf_path, pages='all')
-
- extract_time = time.time() - start_time
-
- print(f"\n✓ 表格提取完成!共提取到 {len(tables_data)} 个表格 (耗时: {extract_time:.2f}秒)")
-
- # 保存到 extracted_tables 目录(包含表格前文本)
- extracted_output_dir = Path('extracted_tables')
- extracted_output_dir.mkdir(parents=True, exist_ok=True)
- print(f"\n正在保存所有表格为xlsx文件到: {extracted_output_dir}")
-
- # 先统计每页的表格数量
- page_table_counts = {}
- for page_num, df, bbox in tables_data:
- if page_num not in page_table_counts:
- page_table_counts[page_num] = 0
- page_table_counts[page_num] += 1
-
- # 为每个页面维护一个计数器
- page_current_index = {}
-
- # 保存所有表格为xlsx文件,使用新的命名格式
- saved_count = 0
- for i, (page_num, df, bbox) in tqdm(enumerate(tables_data), total=len(tables_data),
- desc="保存xlsx", unit="个", ncols=80):
- # 更新该页面的当前索引
- if page_num not in page_current_index:
- page_current_index[page_num] = 0
- page_current_index[page_num] += 1
-
- current_num = page_current_index[page_num]
- total_num = page_table_counts[page_num]
-
- # 使用新的命名格式: table_page{page}_{current}of{total}.xlsx
- xlsx_file = extracted_output_dir / f"table_page{page_num}_{current_num}of{total_num}.xlsx"
-
- try:
- # 提取表格前的文本
- text_before = ""
- if EXTRACT_TEXT_BEFORE_TABLE and PYMUPDF_AVAILABLE:
- text_before = extract_text_before_table(pdf_path, page_num, bbox, TEXT_LINES_BEFORE_TABLE)
-
- # 如果有文本,将其添加到表格的第一行
- if text_before:
- # 创建一个新的 DataFrame,第一行是文本信息
- text_row = pd.DataFrame([[f"[表格前文本] {text_before}"]], columns=[0])
- # 将原表格的列数调整为与文本行一致
- table_df = df.copy()
- # 如果表格列数大于1,在文本行后面填充空值
- if len(table_df.columns) > 1:
- for col_idx in range(1, len(table_df.columns)):
- text_row[col_idx] = ""
-
- # 合并文本行和表格
- combined_df = pd.concat([text_row, table_df], ignore_index=True)
- combined_df.to_excel(str(xlsx_file), index=False, header=False)
- else:
- # 没有文本,直接保存表格
- df.to_excel(str(xlsx_file), index=False, header=False)
-
- saved_count += 1
- except Exception as e:
- tqdm.write(f"⚠ 表格 {i+1} (页面 {page_num}) 保存失败: {e}")
-
- print(f"\n✓ 已保存 {saved_count} 个xlsx文件到 {extracted_output_dir}")
-
- # 更新 xlsx_input_path 指向 extracted_tables
- xlsx_input_path = extracted_output_dir
-
- print(f"\n✓ 已保存 {saved_count} 个xlsx文件到 {extracted_output_dir}")
-
- # 更新 xlsx_input_path 指向 extracted_tables
- xlsx_input_path = extracted_output_dir
- print("=" * 60)
- print("现在开始合并跨页表格...")
- print("=" * 60)
-
- # ========== 步骤2: 合并跨页表格 ==========
- # 从 extracted_tables 读取,合并后保存到 merged_tables(不进行过滤)
-
- # 检查 merged_tables 是否已经存在
- merged_files_exist = list(merged_output_dir.glob("*.xlsx")) if merged_output_dir.exists() else []
-
- if not merged_files_exist:
- print(f"\n正在从 {xlsx_input_path} 读取表格...")
- start_time = time.time()
- extracted_tables = load_tables_from_xlsx(xlsx_input_path)
- load_time = time.time() - start_time
-
- if not extracted_tables:
- print("⚠ 未找到任何xlsx文件")
- exit(1)
-
- print(f"✓ 加载完成!共加载 {len(extracted_tables)} 个xlsx文件 (耗时: {load_time:.2f}秒)")
-
- # 执行合并(合并所有表格,不进行过滤)
- print("\n正在合并跨页表格(基于表格前文本)...")
- merged_tables_list = []
- i = 0
-
- while i < len(extracted_tables):
- idx, df, page = extracted_tables[i]
-
- # 检查是否有表格前文本
- has_text, text_content = has_text_before_table(df)
-
- if not has_text:
- # 没有前文本,跳过(孤立数据)
- print(f" ⚠ 跳过表格 {idx} (页面 {page}): 没有表格前文本")
- i += 1
- continue
- # 有前文本,这是一个新表格的开始
- # 移除文本行,获取纯表格数据
- merged_df = remove_text_row(df)
-
- # 判断是否是 table_3(用于调试)
- is_table_3 = (len(merged_tables_list) == 2) # table_3 是第3个表格(索引2)
-
- # 先合并被截断的行(在清理换行符之前,这样可以保留原始的 NaN 值)
- if is_table_3:
- print(f"\n[DEBUG] 处理 table_3 (索引 {len(merged_tables_list)})")
- merged_df = merge_broken_rows(merged_df, header_rows=1, debug=is_table_3)
-
- # 再清理换行符
- merged_df = clean_newlines(merged_df)
-
- merged_count = 0
- merged_pages = [page]
- merged_indices = [idx]
- j = i + 1
-
- # 检查后续表格是否需要合并
- while j < len(extracted_tables):
- next_idx, next_df, next_page = extracted_tables[j]
-
- # 检查下一个表格是否有前文本
- next_has_text, _ = has_text_before_table(next_df)
-
- if next_has_text:
- # 下一个表格有前文本,是新表格的开始,停止合并
- break
-
- # 下一个表格没有前文本,检查是否应该合并
- page_diff = next_page - merged_pages[-1]
- next_df_clean = remove_text_row(next_df)
-
- # 先合并被截断的行
- next_df_clean = merge_broken_rows(next_df_clean, header_rows=0)
-
- # 再清理换行符
- next_df_clean = clean_newlines(next_df_clean)
-
- if page_diff <= CROSS_PAGE_SEARCH_RANGE and has_similar_structure(merged_df, next_df_clean):
- # 合并
- merged_df = pd.concat([merged_df, next_df_clean], ignore_index=True)
- merged_count += 1
- merged_pages.append(next_page)
- merged_indices.append(next_idx)
- print(f" ✓ 合并: 页面 {page} + 页面 {next_page} [文本: {text_content[:40]}...]")
- j += 1
- else:
- # 页面距离太远或结构不同,停止合并
- break
-
- # 跨页合并完成后,再次处理被截断的行
- if merged_count > 0:
- if is_table_3:
- print(f"\n[DEBUG] table_3 跨页合并后,再次检查被截断的行")
- merged_df = merge_broken_rows(merged_df, header_rows=1, debug=is_table_3)
-
- # 添加合并后的表格(保存所有表格,不进行过滤)
- merged_tables_list.append((idx, merged_df, page, text_content))
- i = j if merged_count > 0 else i + 1
-
- print(f"\n✓ 合并完成!从 {len(extracted_tables)} 个原始表格合并为 {len(merged_tables_list)} 个表格")
-
- # 保存合并后的表格到 merged_tables 目录(保存所有表格)
- merged_output_dir.mkdir(parents=True, exist_ok=True)
- print(f"\n正在保存合并后的表格到: {merged_output_dir}...")
-
- saved_count = 0
- for idx, (orig_idx, merged_df, page, text_content) in tqdm(enumerate(merged_tables_list),
- total=len(merged_tables_list),
- desc="保存合并表格", unit="个", ncols=80):
- # 使用简单的命名: table_1.xlsx, table_2.xlsx, ...
- xlsx_file = merged_output_dir / f"table_{idx+1}.xlsx"
- try:
- # 保存纯表格数据(已经移除了文本行)
- merged_df.to_excel(str(xlsx_file), index=False, header=False)
- saved_count += 1
- except Exception as e:
- tqdm.write(f"⚠ 表格 {idx+1} 保存失败: {e}")
-
- print(f"\n✓ 已保存 {saved_count} 个合并后的表格到 {merged_output_dir}")
- else:
- print(f"\n✓ 发现已存在的合并表格,跳过合并步骤")
-
- print("=" * 60)
- print("现在开始从合并表格中过滤...")
- print("=" * 60)
-
- # ========== 步骤3: 从 merged_tables 加载用于过滤 ==========
- # 从 merged_tables 读取(只从这一个目录读取)
- print(f"\n正在从 {merged_output_dir} 读取合并后的表格...")
- start_time = time.time()
- xlsx_tables = load_tables_from_xlsx(merged_output_dir)
- load_time = time.time() - start_time
-
- if not xlsx_tables:
- print("⚠ 未找到任何xlsx文件")
- exit(1)
-
- print(f"\n✓ 加载完成!共加载 {len(xlsx_tables)} 个xlsx文件 (耗时: {load_time:.2f}秒)")
-
- # 验证 merged_tables 中没有文本行
- text_row_count = 0
- for idx, df, page in xlsx_tables:
- has_text, _ = has_text_before_table(df)
- if has_text:
- text_row_count += 1
- print(f" ⚠ 警告: 表格 {idx+1} 仍包含 [表格前文本] 行")
-
- if text_row_count > 0:
- print(f"\n⚠ 发现 {text_row_count} 个表格仍包含文本行,这不应该发生!")
- print(" merged_tables 应该只包含纯表格数据")
- else:
- print(f"✓ 验证通过:所有表格都不包含 [表格前文本] 行")
-
- # 转换为类似camelot tables的对象结构
- class XlsxTable:
- def __init__(self, df, page):
- self.df = df
- self.page = page
-
- # 创建模拟的tables对象
- class XlsxTables:
- def __init__(self, xlsx_tables):
- self.n = len(xlsx_tables)
- self._tables = [XlsxTable(df, page) for _, df, page in xlsx_tables]
-
- def __getitem__(self, idx):
- return self._tables[idx]
-
- def __iter__(self):
- return iter(self._tables)
-
- tables = XlsxTables(xlsx_tables)
-
- else:
- print("=" * 60)
- print("开始提取PDF表格...")
- print("=" * 60)
-
- # 提取表格
- print("\n正在读取PDF文件并提取表格,请稍候...")
- start_time = time.time()
-
- # 使用 pdfplumber 提取表格
- tables_data = extract_tables_with_pdfplumber(pdf_path, pages='all')
-
- extract_time = time.time() - start_time
-
- print(f"\n✓ 表格提取完成!共提取到 {len(tables_data)} 个表格 (耗时: {extract_time:.2f}秒)")
-
- # 转换为类似的对象结构以兼容后续代码
- class PdfPlumberTable:
- def __init__(self, df, page):
- self.df = df
- self.page = page
-
- class PdfPlumberTables:
- def __init__(self, tables_data):
- self.n = len(tables_data)
- self._tables = [PdfPlumberTable(df, page) for page, df, bbox in tables_data]
-
- def __getitem__(self, idx):
- return self._tables[idx]
-
- def __iter__(self):
- return iter(self._tables)
-
- tables = PdfPlumberTables(tables_data)
- # 显示表头预览
- if SHOW_HEADER_PREVIEW:
- preview_table_headers(tables)
- # 获取当前文件对应的表头规则
- if FILTER_FROM_EXISTING_XLSX:
- # 从xlsx过滤时,使用PDF文件名来匹配规则(如果xlsx是从该PDF生成的)
- header_rules = get_header_rules_for_file(pdf_path)
- else:
- header_rules = get_header_rules_for_file(pdf_path)
- # 过滤表格
- if ENABLE_HEADER_FILTER:
- if not header_rules:
- print(f"\n⚠ 警告: 未找到文件 '{Path(pdf_path).name}' 对应的表头规则!")
- print("请在 TABLE_HEADER_RULES 中添加该文件的规则配置")
- print("或者设置 ENABLE_HEADER_FILTER = False 来提取所有表格")
- user_input = input("\n是否继续提取所有表格?(y/n): ").strip().lower()
- if user_input != 'y':
- exit(0)
-
- # 根据数据源类型处理
- if FILTER_FROM_EXISTING_XLSX and xlsx_tables:
- matched_tables = [(idx, df, page) for idx, df, page in xlsx_tables]
- else:
- matched_tables = [(i, table.df, table.page) for i, table in enumerate(tables)]
- print(f"✓ 将处理所有 {len(matched_tables)} 个表格")
- merged_tables_final = [(idx, df, "") for idx, df, page in matched_tables]
- else:
- print(f"\n找到 {len(header_rules)} 个表头规则:")
- for rule in header_rules:
- print(f" - {rule['name']}: {', '.join(rule['keywords'][:3])}...")
-
- print("\n正在根据表头规则过滤表格...")
-
- # 根据数据源类型过滤
- if FILTER_FROM_EXISTING_XLSX and xlsx_tables:
- # 从 merged_tables 文件过滤 - 这些表格已经合并过了,不需要再合并
- all_tables_info = [] # [(索引, DataFrame, 页面号, 是否匹配, 规则名称), ...]
-
- for idx, df, page in xlsx_tables:
- # 检查是否匹配任何规则(merged_tables 中已经没有文本行了)
- matched = False
- matched_rule = ""
-
- for rule in header_rules:
- is_match, rule_name = check_table_header(df, rule)
- if is_match:
- matched = True
- matched_rule = rule_name
- break
-
- all_tables_info.append((idx, df, page, matched, matched_rule))
-
- print(f"✓ 检查了 {len(all_tables_info)} 个表格")
-
- # 只保留匹配的表格(不需要合并,因为已经在 merged_tables 中合并过了)
- merged_tables_final = [(idx, df, rule_name)
- for idx, df, page, matched, rule_name in all_tables_info if matched]
-
- if len(merged_tables_final) == 0:
- print("\n⚠ 警告: 没有找到匹配的表格!")
- print("请检查 TABLE_HEADER_RULES 配置")
- exit(1)
- else:
- # 从 PDF 提取
- matched_tables = []
- matched_rules_info = {} # 记录每个表格匹配的规则
-
- for i, table in enumerate(tables):
- for rule in header_rules:
- is_match, rule_name = check_table_header(table.df, rule)
- if is_match:
- matched_tables.append((i, table))
- matched_rules_info[i] = rule_name
- break # 匹配到一个规则就停止
-
- print(f"✓ 匹配到 {len(matched_tables)} 个符合条件的表格")
-
- if len(matched_tables) == 0:
- print("\n⚠ 警告: 没有找到匹配的表格!")
- print("请检查 TABLE_HEADER_RULES 配置")
- print("如果 SHOW_HEADER_PREVIEW=True,可以查看上面的表头预览来调整配置")
- exit(1)
-
- # 合并跨页表格(从 PDF)
- # TODO: 实现 PDF 的跨页合并(暂时不实现)
- merged_tables_final = [(orig_idx, table.df, matched_rules_info.get(orig_idx, ""))
- for orig_idx, table in matched_tables]
-
- # 显示匹配的表格信息(支持多行表头)
- print("\n匹配的表格:")
- for idx, (orig_idx, table_df, rule_name) in enumerate(merged_tables_final):
- if table_df.empty:
- header_preview = "[空表格]"
- else:
- # 合并前3行作为表头预览(处理换行情况)
- header_rows_to_check = min(3, len(table_df))
- header_parts = []
- for row_idx in range(header_rows_to_check):
- row = table_df.iloc[row_idx].astype(str).str.strip()
- for cell in row[:3]: # 只显示前3列
- cell_text = str(cell).strip()
- if cell_text and cell_text.lower() not in ['nan', 'none', '']:
- # 去除换行符
- cell_text = cell_text.replace('\n', '').replace('\r', '')
- header_parts.append(cell_text)
- header_preview = " | ".join(header_parts[:6]) # 最多显示6个部分
- if len(header_parts) > 6:
- header_preview += " ..."
-
- if SHOW_MATCHED_RULE_NAME:
- print(f" {idx+1}. 原表格 {orig_idx+1} [{rule_name}]: {header_preview} (行数: {len(table_df)})")
- else:
- print(f" {idx+1}. 原表格 {orig_idx+1}: {header_preview} (行数: {len(table_df)})")
-
- # 更新matched_tables为合并后的结果,保留页面信息
- # 需要从原始 xlsx_tables 中获取页面信息
- matched_tables = []
- for orig_idx, table_df, _ in merged_tables_final:
- # 查找原始表格的页面信息
- page = 0
- if isinstance(orig_idx, str):
- # 如果是分割后的表格(格式:idx_subidx)
- base_idx = int(orig_idx.split('_')[0])
- else:
- base_idx = orig_idx
-
- # 从 xlsx_tables 中查找页面号
- for idx, df, p in xlsx_tables:
- if idx == base_idx:
- page = p
- break
-
- matched_tables.append((orig_idx, table_df, page))
- else:
- # 不启用表头过滤
- if FILTER_FROM_EXISTING_XLSX and xlsx_tables:
- matched_tables = [(idx, df, page) for idx, df, page in xlsx_tables]
- else:
- matched_tables = [(i, table.df, table.page) for i, table in enumerate(tables)]
- print(f"✓ 未启用表头过滤,将处理所有 {len(matched_tables)} 个表格")
- # 确定输出目录:如果启用了表头过滤,使用筛选后的目录,否则使用原始目录
- if ENABLE_HEADER_FILTER:
- final_output_dir = filtered_output_dir
- print(f"\n开始保存筛选后的表格文件到: {final_output_dir}...\n")
- else:
- final_output_dir = output_dir
- print(f"\n开始保存表格文件到: {final_output_dir}...\n")
- # 创建输出目录
- final_output_dir.mkdir(parents=True, exist_ok=True)
- # 保存为Excel,使用tqdm显示进度
- success_count = 0
- error_count = 0
- for idx, (orig_idx, table_df, page) in tqdm(enumerate(matched_tables), total=len(matched_tables),
- desc="处理表格", unit="个", ncols=80):
- # 使用简单的命名格式: table_1.xlsx, table_2.xlsx, ...
- excel_path = final_output_dir / f"table_{idx+1}.xlsx"
-
- try:
- # 直接使用DataFrame的to_excel方法
- table_df.to_excel(str(excel_path), index=False, header=False)
- success_count += 1
- except Exception as e:
- tqdm.write(f"⚠ 表格 {idx+1} (原表格 {orig_idx}) Excel保存失败: {e}")
- error_count += 1
- total_time = time.time() - start_time
- print("\n" + "=" * 60)
- print("处理完成!")
- print("=" * 60)
- if ENABLE_HEADER_FILTER:
- if FILTER_FROM_EXISTING_XLSX and xlsx_tables:
- print(f"📊 原始表格总数: {len(xlsx_tables)}")
- else:
- print(f"📊 原始表格总数: {tables.n}")
- print(f"✅ 匹配的表格数: {len(matched_tables)}")
- print(f"✓ 成功保存: {success_count} 个表格")
- if error_count > 0:
- print(f"⚠ 保存失败: {error_count} 个表格")
- print(f"📁 输出目录: {final_output_dir.absolute()}")
- if ENABLE_HEADER_FILTER:
- print(f"📁 原始表格目录: {output_dir.absolute()}")
- print(f"⏱ 总耗时: {total_time:.2f}秒")
- print("=" * 60)
|