| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- #!/usr/bin/env python3
- """
- 解析 原文.docx 并将结构化内容插入数据库
- - 提取段落(含格式:字体、大小、粗体、斜体、颜色、对齐等)
- - 提取表格(含合并单元格)
- - 提取图片(转 base64)
- - 按文档顺序组装 blocks JSON
- - 插入到 node_properties.prop_json 作为附件的文档内容
- """
- import json
- import base64
- import re
- import sys
- import os
- from io import BytesIO
- from collections import OrderedDict
- import docx
- from docx import Document
- from docx.shared import Pt, Emu, Inches, Cm, Twips
- from docx.enum.text import WD_ALIGN_PARAGRAPH
- from docx.oxml.ns import qn
- from lxml import etree
- DOCX_PATH = os.path.join(os.path.dirname(__file__), "原文.docx")
- # ============================================================
- # 1. 图片提取
- # ============================================================
- def extract_images(doc):
- """提取文档中所有图片,返回 {rId: base64_data_uri}"""
- images = {}
- for rel_id, rel in doc.part.rels.items():
- if "image" in rel.reltype:
- blob = rel.target_part.blob
- # 判断图片类型
- target = rel.target_ref.lower()
- if target.endswith('.png'):
- mime = 'image/png'
- elif target.endswith('.jpg') or target.endswith('.jpeg'):
- mime = 'image/jpeg'
- elif target.endswith('.gif'):
- mime = 'image/gif'
- elif target.endswith('.bmp'):
- mime = 'image/bmp'
- elif target.endswith('.emf'):
- mime = 'image/x-emf'
- elif target.endswith('.wmf'):
- mime = 'image/x-wmf'
- else:
- mime = 'image/png'
- b64 = base64.b64encode(blob).decode('ascii')
- images[rel_id] = f"data:{mime};base64,{b64}"
- return images
- # ============================================================
- # 2. 段落/Run 格式提取
- # ============================================================
- def get_alignment(paragraph):
- """获取段落对齐方式"""
- align = paragraph.alignment
- if align is None:
- # 检查 pPr 中的 jc
- pPr = paragraph._element.find(qn('w:pPr'))
- if pPr is not None:
- jc = pPr.find(qn('w:jc'))
- if jc is not None:
- return jc.get(qn('w:val'))
- return None
- align_map = {
- WD_ALIGN_PARAGRAPH.LEFT: 'left',
- WD_ALIGN_PARAGRAPH.CENTER: 'center',
- WD_ALIGN_PARAGRAPH.RIGHT: 'right',
- WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify',
- }
- return align_map.get(align, None)
- def get_paragraph_style_info(paragraph):
- """提取段落级别样式"""
- style_info = {}
-
- pf = paragraph.paragraph_format
-
- # 对齐
- alignment = get_alignment(paragraph)
- if alignment:
- style_info['alignment'] = alignment
-
- # 缩进
- if pf.left_indent:
- style_info['indentLeft'] = int(pf.left_indent) # EMU
- if pf.right_indent:
- style_info['indentRight'] = int(pf.right_indent)
- if pf.first_line_indent:
- val = int(pf.first_line_indent)
- if val > 0:
- style_info['indentFirstLine'] = val
- else:
- style_info['indentHanging'] = -val
-
- # 间距
- if pf.space_before:
- style_info['spacingBefore'] = int(pf.space_before)
- if pf.space_after:
- style_info['spacingAfter'] = int(pf.space_after)
- if pf.line_spacing:
- style_info['lineSpacing'] = float(pf.line_spacing)
-
- return style_info
- def get_run_format(run):
- """提取 Run 级别格式"""
- fmt = {}
- font = run.font
-
- if font.name:
- fmt['fontFamily'] = font.name
- if font.size:
- fmt['fontSize'] = font.size.pt
- if font.bold:
- fmt['bold'] = True
- if font.italic:
- fmt['italic'] = True
- if font.underline and font.underline is not True:
- fmt['underline'] = str(font.underline)
- elif font.underline is True:
- fmt['underline'] = 'single'
- if font.strike:
- fmt['strikeThrough'] = True
- if font.color and font.color.rgb:
- fmt['color'] = str(font.color.rgb)
- try:
- if font.highlight_color:
- fmt['highlightColor'] = str(font.highlight_color)
- except (ValueError, KeyError):
- pass # skip unsupported highlight values like 'none'
-
- # 上下标
- if font.superscript:
- fmt['verticalAlign'] = 'superscript'
- elif font.subscript:
- fmt['verticalAlign'] = 'subscript'
-
- return fmt
- def detect_paragraph_type(paragraph):
- """检测段落类型(标题、目录、正文等)"""
- style_name = paragraph.style.name if paragraph.style else ''
-
- if style_name.startswith('Heading') or style_name.startswith('heading'):
- level = re.search(r'\d+', style_name)
- if level:
- return f'heading{level.group()}'
- return 'heading1'
-
- if style_name.startswith('toc ') or style_name.startswith('TOC'):
- level = re.search(r'\d+', style_name)
- lvl = level.group() if level else '1'
- return f'toc{lvl}'
-
- if style_name.startswith('List'):
- return 'list_item'
-
- # 检查是否是标题样式(通过 run 格式推断)
- text = paragraph.text.strip()
- if text and paragraph.runs:
- first_run = paragraph.runs[0]
- if first_run.font.bold and first_run.font.size:
- size_pt = first_run.font.size.pt
- if size_pt >= 18:
- return 'heading1'
- elif size_pt >= 16:
- return 'heading2'
- elif size_pt >= 14:
- # 检查是否是章节标题(如 "1 企业概述")
- if re.match(r'^\d+(\.\d+)*\s', text):
- dots = text.split()[0].count('.')
- if dots == 0:
- return 'heading1'
- elif dots == 1:
- return 'heading2'
- else:
- return 'heading3'
-
- return 'paragraph'
- # ============================================================
- # 3. 检查段落中的内联图片
- # ============================================================
- def get_paragraph_images(paragraph, images_map):
- """检查段落中是否包含内联图片,返回图片列表"""
- inline_images = []
- for run in paragraph.runs:
- run_xml = run._element
- drawings = run_xml.findall(qn('w:drawing'))
- for drawing in drawings:
- # 查找 blip (图片引用)
- blips = drawing.findall('.//' + qn('a:blip'))
- for blip in blips:
- embed = blip.get(qn('r:embed'))
- if embed and embed in images_map:
- # 获取图片尺寸
- extent = drawing.find('.//' + qn('wp:extent'))
- width = height = None
- if extent is not None:
- cx = extent.get('cx')
- cy = extent.get('cy')
- if cx:
- width = int(cx) / 914400 # EMU to inches, then to px approx
- if cy:
- height = int(cy) / 914400
-
- inline_images.append({
- 'rId': embed,
- 'src': images_map[embed],
- 'widthInch': round(width, 2) if width else None,
- 'heightInch': round(height, 2) if height else None,
- })
- return inline_images
- # ============================================================
- # 4. 表格提取
- # ============================================================
- def parse_table(table):
- """解析表格为结构化数据"""
- rows_data = []
- for row in table.rows:
- cells_data = []
- for cell in row.cells:
- cell_text = cell.text.strip()
- # 检查合并
- tc = cell._tc
- grid_span = tc.find(qn('w:tcPr'))
- colspan = 1
- if grid_span is not None:
- gs = grid_span.find(qn('w:gridSpan'))
- if gs is not None:
- colspan = int(gs.get(qn('w:val'), 1))
-
- # 单元格内段落格式
- paras = []
- for p in cell.paragraphs:
- runs = []
- for r in p.runs:
- run_data = {'text': r.text}
- fmt = get_run_format(r)
- if fmt:
- run_data['format'] = fmt
- runs.append(run_data)
- if runs:
- para_data = {'runs': runs}
- align = get_alignment(p)
- if align:
- para_data['alignment'] = align
- paras.append(para_data)
-
- cell_data = {
- 'text': cell_text,
- 'colspan': colspan,
- }
- if paras:
- cell_data['paragraphs'] = paras
- cells_data.append(cell_data)
- rows_data.append(cells_data)
-
- return {
- 'rows': len(table.rows),
- 'cols': len(table.columns),
- 'data': rows_data,
- }
- # ============================================================
- # 5. 按文档 XML 顺序遍历(段落+表格交错)
- # ============================================================
- def iter_block_items(doc):
- """
- 按文档 body 中的顺序迭代段落和表格。
- 返回 (type, item) 元组,type 为 'paragraph' 或 'table'。
- """
- body = doc.element.body
- for child in body:
- if child.tag == qn('w:p'):
- yield ('paragraph', docx.text.paragraph.Paragraph(child, doc))
- elif child.tag == qn('w:tbl'):
- yield ('table', docx.table.Table(child, doc))
- elif child.tag == qn('w:sectPr'):
- pass # section properties, skip
- # ============================================================
- # 6. 主解析函数
- # ============================================================
- def parse_document(docx_path):
- """解析 Word 文档,返回结构化 JSON"""
- doc = Document(docx_path)
-
- # 提取所有图片
- images_map = extract_images(doc)
- print(f" 提取图片: {len(images_map)} 张")
-
- # 页面设置
- section = doc.sections[0]
- page_info = {
- 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210,
- 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297,
- 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4,
- 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4,
- 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8,
- 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8,
- }
-
- blocks = []
- block_id = 0
-
- for block_type, item in iter_block_items(doc):
- if block_type == 'paragraph':
- paragraph = item
- text = paragraph.text
-
- # 检测段落类型
- para_type = detect_paragraph_type(paragraph)
-
- # 段落样式
- style_info = get_paragraph_style_info(paragraph)
-
- # 检查内联图片
- inline_imgs = get_paragraph_images(paragraph, images_map)
-
- # 提取 runs
- runs = []
- for r in paragraph.runs:
- run_text = r.text
- if not run_text:
- continue
- run_data = {'text': run_text}
- fmt = get_run_format(r)
- if fmt:
- run_data.update(fmt)
- runs.append(run_data)
-
- block = {
- 'id': f'b{block_id}',
- 'type': para_type,
- }
-
- if runs:
- block['runs'] = runs
-
- if style_info:
- block['style'] = style_info
-
- if inline_imgs:
- block['images'] = inline_imgs
-
- # 即使空段落也保留(用于间距)
- if not runs and not inline_imgs:
- block['runs'] = [{'text': ''}]
-
- blocks.append(block)
- block_id += 1
-
- elif block_type == 'table':
- table_data = parse_table(item)
- block = {
- 'id': f'b{block_id}',
- 'type': 'table',
- 'table': table_data,
- }
- blocks.append(block)
- block_id += 1
-
- print(f" 解析完成: {len(blocks)} 个块")
-
- return {
- 'page': page_info,
- 'blocks': blocks,
- 'totalBlocks': len(blocks),
- }
- # ============================================================
- # 7. 插入数据库
- # ============================================================
- def insert_to_db(doc_json):
- """将解析后的文档内容插入数据库"""
- import psycopg2
-
- conn = psycopg2.connect(
- host='127.0.0.1',
- port=5432,
- dbname='lingyue_zhibao',
- user='postgres',
- password='postgres'
- )
- cur = conn.cursor()
-
- try:
- # 1. 创建附件节点 (原文.docx)
- att_node_key = 'ATT-2024-003'
- att_name = '原文-复审报告'
-
- # 检查是否已存在
- cur.execute("SELECT id FROM nodes WHERE node_key = %s AND node_type = 'ATTACHMENT'", (att_node_key,))
- row = cur.fetchone()
-
- if row:
- att_id = row[0]
- print(f" 附件节点已存在: id={att_id}")
- # 更新 doc_content 属性
- cur.execute("""
- INSERT INTO node_properties (node_id, prop_key, prop_json)
- VALUES (%s, 'doc_content', %s::jsonb)
- ON CONFLICT (node_id, prop_key)
- DO UPDATE SET prop_json = EXCLUDED.prop_json, updated_at = now()
- """, (att_id, json.dumps(doc_json, ensure_ascii=False)))
- else:
- # 获取下一个 id
- cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM nodes WHERE id >= 400 AND id < 500")
- att_id = cur.fetchone()[0]
- if att_id < 402:
- att_id = 402
-
- cur.execute("""
- INSERT INTO nodes (id, node_type, node_key, name, status, created_by, created_at, updated_at)
- VALUES (%s, 'ATTACHMENT', %s, %s, 'active', 1, now(), now())
- """, (att_id, att_node_key, att_name))
- print(f" 创建附件节点: id={att_id}")
-
- # 插入基本属性
- cur.execute("""
- INSERT INTO node_properties (node_id, prop_key, prop_value) VALUES
- (%s, 'display_name', '原文.docx'),
- (%s, 'file_type', 'docx'),
- (%s, 'file_size', '3538608')
- """, (att_id, att_id, att_id))
-
- # 插入文档内容
- cur.execute("""
- INSERT INTO node_properties (node_id, prop_key, prop_json)
- VALUES (%s, 'doc_content', %s::jsonb)
- """, (att_id, json.dumps(doc_json, ensure_ascii=False)))
-
- # 创建边:PROJECT -> ATTACHMENT
- cur.execute("""
- INSERT INTO edges (from_node_id, to_node_id, edge_type)
- SELECT 10, %s, 'HAS_ATTACHMENT'
- WHERE NOT EXISTS (
- SELECT 1 FROM edges WHERE from_node_id = 10 AND to_node_id = %s AND edge_type = 'HAS_ATTACHMENT'
- )
- """, (att_id, att_id))
-
- conn.commit()
- print(f" 数据库插入成功")
-
- except Exception as e:
- conn.rollback()
- print(f" 数据库错误: {e}")
- raise
- finally:
- cur.close()
- conn.close()
- # ============================================================
- # Main
- # ============================================================
- if __name__ == '__main__':
- print("=" * 60)
- print("解析 原文.docx ...")
- print("=" * 60)
-
- doc_json = parse_document(DOCX_PATH)
-
- # 保存 JSON 到文件(调试用,不含 base64 图片)
- debug_json = json.loads(json.dumps(doc_json))
- # 统计图片大小
- total_img_size = 0
- img_count = 0
- for block in debug_json['blocks']:
- if 'images' in block:
- for img in block['images']:
- if 'src' in img:
- total_img_size += len(img['src'])
- img_count += 1
-
- print(f" 图片总大小(base64): {total_img_size / 1024 / 1024:.1f} MB ({img_count} 张)")
- print(f" JSON 总大小: {len(json.dumps(doc_json, ensure_ascii=False)) / 1024 / 1024:.1f} MB")
-
- print("\n插入数据库...")
- insert_to_db(doc_json)
-
- # 保存精简版 JSON(不含 base64)用于调试
- for block in debug_json['blocks']:
- if 'images' in block:
- for img in block['images']:
- if 'src' in img:
- img['src'] = img['src'][:50] + '...[truncated]'
-
- debug_path = os.path.join(os.path.dirname(__file__), "parsed_doc_debug.json")
- with open(debug_path, 'w', encoding='utf-8') as f:
- json.dump(debug_json, f, ensure_ascii=False, indent=2)
- print(f"\n调试 JSON 已保存: {debug_path}")
- print("完成!")
|