#!/usr/bin/env python3 """ 解析 原文.docx 并将结构化内容插入数据库 - 提取段落(含格式:字体、大小、粗体、斜体、颜色、对齐等) - 提取表格(含合并单元格) - 提取图片(转 base64) - 按文档顺序组装 blocks JSON - 插入到 node_properties.prop_json 作为附件的文档内容 """ import json import base64 import re import sys import os from io import BytesIO from collections import OrderedDict import docx from docx import Document from docx.shared import Pt, Emu, Inches, Cm, Twips from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml.ns import qn from lxml import etree DOCX_PATH = os.path.join(os.path.dirname(__file__), "原文.docx") # ============================================================ # 1. 图片提取 # ============================================================ def extract_images(doc): """提取文档中所有图片,返回 {rId: base64_data_uri}""" images = {} for rel_id, rel in doc.part.rels.items(): if "image" in rel.reltype: blob = rel.target_part.blob # 判断图片类型 target = rel.target_ref.lower() if target.endswith('.png'): mime = 'image/png' elif target.endswith('.jpg') or target.endswith('.jpeg'): mime = 'image/jpeg' elif target.endswith('.gif'): mime = 'image/gif' elif target.endswith('.bmp'): mime = 'image/bmp' elif target.endswith('.emf'): mime = 'image/x-emf' elif target.endswith('.wmf'): mime = 'image/x-wmf' else: mime = 'image/png' b64 = base64.b64encode(blob).decode('ascii') images[rel_id] = f"data:{mime};base64,{b64}" return images # ============================================================ # 2. 段落/Run 格式提取 # ============================================================ def get_alignment(paragraph): """获取段落对齐方式""" align = paragraph.alignment if align is None: # 检查 pPr 中的 jc pPr = paragraph._element.find(qn('w:pPr')) if pPr is not None: jc = pPr.find(qn('w:jc')) if jc is not None: return jc.get(qn('w:val')) return None align_map = { WD_ALIGN_PARAGRAPH.LEFT: 'left', WD_ALIGN_PARAGRAPH.CENTER: 'center', WD_ALIGN_PARAGRAPH.RIGHT: 'right', WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify', } return align_map.get(align, None) def get_paragraph_style_info(paragraph): """提取段落级别样式""" style_info = {} pf = paragraph.paragraph_format # 对齐 alignment = get_alignment(paragraph) if alignment: style_info['alignment'] = alignment # 缩进 if pf.left_indent: style_info['indentLeft'] = int(pf.left_indent) # EMU if pf.right_indent: style_info['indentRight'] = int(pf.right_indent) if pf.first_line_indent: val = int(pf.first_line_indent) if val > 0: style_info['indentFirstLine'] = val else: style_info['indentHanging'] = -val # 间距 if pf.space_before: style_info['spacingBefore'] = int(pf.space_before) if pf.space_after: style_info['spacingAfter'] = int(pf.space_after) if pf.line_spacing: style_info['lineSpacing'] = float(pf.line_spacing) return style_info def get_run_format(run): """提取 Run 级别格式""" fmt = {} font = run.font if font.name: fmt['fontFamily'] = font.name if font.size: fmt['fontSize'] = font.size.pt if font.bold: fmt['bold'] = True if font.italic: fmt['italic'] = True if font.underline and font.underline is not True: fmt['underline'] = str(font.underline) elif font.underline is True: fmt['underline'] = 'single' if font.strike: fmt['strikeThrough'] = True if font.color and font.color.rgb: fmt['color'] = str(font.color.rgb) try: if font.highlight_color: fmt['highlightColor'] = str(font.highlight_color) except (ValueError, KeyError): pass # skip unsupported highlight values like 'none' # 上下标 if font.superscript: fmt['verticalAlign'] = 'superscript' elif font.subscript: fmt['verticalAlign'] = 'subscript' return fmt def detect_paragraph_type(paragraph): """检测段落类型(标题、目录、正文等)""" style_name = paragraph.style.name if paragraph.style else '' if style_name.startswith('Heading') or style_name.startswith('heading'): level = re.search(r'\d+', style_name) if level: return f'heading{level.group()}' return 'heading1' if style_name.startswith('toc ') or style_name.startswith('TOC'): level = re.search(r'\d+', style_name) lvl = level.group() if level else '1' return f'toc{lvl}' if style_name.startswith('List'): return 'list_item' # 检查是否是标题样式(通过 run 格式推断) text = paragraph.text.strip() if text and paragraph.runs: first_run = paragraph.runs[0] if first_run.font.bold and first_run.font.size: size_pt = first_run.font.size.pt if size_pt >= 18: return 'heading1' elif size_pt >= 16: return 'heading2' elif size_pt >= 14: # 检查是否是章节标题(如 "1 企业概述") if re.match(r'^\d+(\.\d+)*\s', text): dots = text.split()[0].count('.') if dots == 0: return 'heading1' elif dots == 1: return 'heading2' else: return 'heading3' return 'paragraph' # ============================================================ # 3. 检查段落中的内联图片 # ============================================================ def get_paragraph_images(paragraph, images_map): """检查段落中是否包含内联图片,返回图片列表""" inline_images = [] for run in paragraph.runs: run_xml = run._element drawings = run_xml.findall(qn('w:drawing')) for drawing in drawings: # 查找 blip (图片引用) blips = drawing.findall('.//' + qn('a:blip')) for blip in blips: embed = blip.get(qn('r:embed')) if embed and embed in images_map: # 获取图片尺寸 extent = drawing.find('.//' + qn('wp:extent')) width = height = None if extent is not None: cx = extent.get('cx') cy = extent.get('cy') if cx: width = int(cx) / 914400 # EMU to inches, then to px approx if cy: height = int(cy) / 914400 inline_images.append({ 'rId': embed, 'src': images_map[embed], 'widthInch': round(width, 2) if width else None, 'heightInch': round(height, 2) if height else None, }) return inline_images # ============================================================ # 4. 表格提取 # ============================================================ def parse_table(table): """解析表格为结构化数据""" rows_data = [] for row in table.rows: cells_data = [] for cell in row.cells: cell_text = cell.text.strip() # 检查合并 tc = cell._tc grid_span = tc.find(qn('w:tcPr')) colspan = 1 if grid_span is not None: gs = grid_span.find(qn('w:gridSpan')) if gs is not None: colspan = int(gs.get(qn('w:val'), 1)) # 单元格内段落格式 paras = [] for p in cell.paragraphs: runs = [] for r in p.runs: run_data = {'text': r.text} fmt = get_run_format(r) if fmt: run_data['format'] = fmt runs.append(run_data) if runs: para_data = {'runs': runs} align = get_alignment(p) if align: para_data['alignment'] = align paras.append(para_data) cell_data = { 'text': cell_text, 'colspan': colspan, } if paras: cell_data['paragraphs'] = paras cells_data.append(cell_data) rows_data.append(cells_data) return { 'rows': len(table.rows), 'cols': len(table.columns), 'data': rows_data, } # ============================================================ # 5. 按文档 XML 顺序遍历(段落+表格交错) # ============================================================ def iter_block_items(doc): """ 按文档 body 中的顺序迭代段落和表格。 返回 (type, item) 元组,type 为 'paragraph' 或 'table'。 """ body = doc.element.body for child in body: if child.tag == qn('w:p'): yield ('paragraph', docx.text.paragraph.Paragraph(child, doc)) elif child.tag == qn('w:tbl'): yield ('table', docx.table.Table(child, doc)) elif child.tag == qn('w:sectPr'): pass # section properties, skip # ============================================================ # 6. 主解析函数 # ============================================================ def parse_document(docx_path): """解析 Word 文档,返回结构化 JSON""" doc = Document(docx_path) # 提取所有图片 images_map = extract_images(doc) print(f" 提取图片: {len(images_map)} 张") # 页面设置 section = doc.sections[0] page_info = { 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210, 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297, 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4, 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4, 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8, 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8, } blocks = [] block_id = 0 for block_type, item in iter_block_items(doc): if block_type == 'paragraph': paragraph = item text = paragraph.text # 检测段落类型 para_type = detect_paragraph_type(paragraph) # 段落样式 style_info = get_paragraph_style_info(paragraph) # 检查内联图片 inline_imgs = get_paragraph_images(paragraph, images_map) # 提取 runs runs = [] for r in paragraph.runs: run_text = r.text if not run_text: continue run_data = {'text': run_text} fmt = get_run_format(r) if fmt: run_data.update(fmt) runs.append(run_data) block = { 'id': f'b{block_id}', 'type': para_type, } if runs: block['runs'] = runs if style_info: block['style'] = style_info if inline_imgs: block['images'] = inline_imgs # 即使空段落也保留(用于间距) if not runs and not inline_imgs: block['runs'] = [{'text': ''}] blocks.append(block) block_id += 1 elif block_type == 'table': table_data = parse_table(item) block = { 'id': f'b{block_id}', 'type': 'table', 'table': table_data, } blocks.append(block) block_id += 1 print(f" 解析完成: {len(blocks)} 个块") return { 'page': page_info, 'blocks': blocks, 'totalBlocks': len(blocks), } # ============================================================ # 7. 插入数据库 # ============================================================ def insert_to_db(doc_json): """将解析后的文档内容插入数据库""" import psycopg2 conn = psycopg2.connect( host='127.0.0.1', port=5432, dbname='lingyue_zhibao', user='postgres', password='postgres' ) cur = conn.cursor() try: # 1. 创建附件节点 (原文.docx) att_node_key = 'ATT-2024-003' att_name = '原文-复审报告' # 检查是否已存在 cur.execute("SELECT id FROM nodes WHERE node_key = %s AND node_type = 'ATTACHMENT'", (att_node_key,)) row = cur.fetchone() if row: att_id = row[0] print(f" 附件节点已存在: id={att_id}") # 更新 doc_content 属性 cur.execute(""" INSERT INTO node_properties (node_id, prop_key, prop_json) VALUES (%s, 'doc_content', %s::jsonb) ON CONFLICT (node_id, prop_key) DO UPDATE SET prop_json = EXCLUDED.prop_json, updated_at = now() """, (att_id, json.dumps(doc_json, ensure_ascii=False))) else: # 获取下一个 id cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM nodes WHERE id >= 400 AND id < 500") att_id = cur.fetchone()[0] if att_id < 402: att_id = 402 cur.execute(""" INSERT INTO nodes (id, node_type, node_key, name, status, created_by, created_at, updated_at) VALUES (%s, 'ATTACHMENT', %s, %s, 'active', 1, now(), now()) """, (att_id, att_node_key, att_name)) print(f" 创建附件节点: id={att_id}") # 插入基本属性 cur.execute(""" INSERT INTO node_properties (node_id, prop_key, prop_value) VALUES (%s, 'display_name', '原文.docx'), (%s, 'file_type', 'docx'), (%s, 'file_size', '3538608') """, (att_id, att_id, att_id)) # 插入文档内容 cur.execute(""" INSERT INTO node_properties (node_id, prop_key, prop_json) VALUES (%s, 'doc_content', %s::jsonb) """, (att_id, json.dumps(doc_json, ensure_ascii=False))) # 创建边:PROJECT -> ATTACHMENT cur.execute(""" INSERT INTO edges (from_node_id, to_node_id, edge_type) SELECT 10, %s, 'HAS_ATTACHMENT' WHERE NOT EXISTS ( SELECT 1 FROM edges WHERE from_node_id = 10 AND to_node_id = %s AND edge_type = 'HAS_ATTACHMENT' ) """, (att_id, att_id)) conn.commit() print(f" 数据库插入成功") except Exception as e: conn.rollback() print(f" 数据库错误: {e}") raise finally: cur.close() conn.close() # ============================================================ # Main # ============================================================ if __name__ == '__main__': print("=" * 60) print("解析 原文.docx ...") print("=" * 60) doc_json = parse_document(DOCX_PATH) # 保存 JSON 到文件(调试用,不含 base64 图片) debug_json = json.loads(json.dumps(doc_json)) # 统计图片大小 total_img_size = 0 img_count = 0 for block in debug_json['blocks']: if 'images' in block: for img in block['images']: if 'src' in img: total_img_size += len(img['src']) img_count += 1 print(f" 图片总大小(base64): {total_img_size / 1024 / 1024:.1f} MB ({img_count} 张)") print(f" JSON 总大小: {len(json.dumps(doc_json, ensure_ascii=False)) / 1024 / 1024:.1f} MB") print("\n插入数据库...") insert_to_db(doc_json) # 保存精简版 JSON(不含 base64)用于调试 for block in debug_json['blocks']: if 'images' in block: for img in block['images']: if 'src' in img: img['src'] = img['src'][:50] + '...[truncated]' debug_path = os.path.join(os.path.dirname(__file__), "parsed_doc_debug.json") with open(debug_path, 'w', encoding='utf-8') as f: json.dump(debug_json, f, ensure_ascii=False, indent=2) print(f"\n调试 JSON 已保存: {debug_path}") print("完成!")