""" DOCX解析服务 将docx文件解析为结构化JSON,供前端渲染和要素提取使用。 """ import base64 import re from io import BytesIO from typing import Dict, List, Any, Optional import docx from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml.ns import qn from loguru import logger def parse_docx_file(file_content: bytes) -> Dict[str, Any]: """ 解析DOCX文件内容 Args: file_content: DOCX文件的二进制内容 Returns: { "page": {...}, "blocks": [...], "totalBlocks": int } """ doc = Document(BytesIO(file_content)) # 提取图片 images_map = _extract_images(doc) logger.info(f"提取图片: {len(images_map)} 张") # 页面设置 section = doc.sections[0] page_info = { 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210, 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297, 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4, 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4, 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8, 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8, } # 解析文档块 blocks = [] block_id = 0 for block_type, item in _iter_block_items(doc): if block_type == 'paragraph': block = _parse_paragraph(item, block_id, images_map) blocks.append(block) block_id += 1 elif block_type == 'table': block = _parse_table(item, block_id) blocks.append(block) block_id += 1 logger.info(f"解析完成: {len(blocks)} 个块") return { 'page': page_info, 'blocks': blocks, 'totalBlocks': len(blocks), } def _extract_images(doc) -> Dict[str, str]: """提取文档中所有图片""" images = {} for rel_id, rel in doc.part.rels.items(): if "image" in rel.reltype: blob = rel.target_part.blob target = rel.target_ref.lower() if target.endswith('.png'): mime = 'image/png' elif target.endswith('.jpg') or target.endswith('.jpeg'): mime = 'image/jpeg' elif target.endswith('.gif'): mime = 'image/gif' elif target.endswith('.bmp'): mime = 'image/bmp' else: mime = 'image/png' b64 = base64.b64encode(blob).decode('ascii') images[rel_id] = f"data:{mime};base64,{b64}" return images def _iter_block_items(doc): """按文档body中的顺序迭代段落和表格""" body = doc.element.body for child in body: if child.tag == qn('w:p'): yield ('paragraph', docx.text.paragraph.Paragraph(child, doc)) elif child.tag == qn('w:tbl'): yield ('table', docx.table.Table(child, doc)) def _get_alignment(paragraph) -> Optional[str]: """获取段落对齐方式""" align = paragraph.alignment if align is None: pPr = paragraph._element.find(qn('w:pPr')) if pPr is not None: jc = pPr.find(qn('w:jc')) if jc is not None: return jc.get(qn('w:val')) return None align_map = { WD_ALIGN_PARAGRAPH.LEFT: 'left', WD_ALIGN_PARAGRAPH.CENTER: 'center', WD_ALIGN_PARAGRAPH.RIGHT: 'right', WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify', } return align_map.get(align, None) def _get_run_format(run) -> Dict: """提取Run级别格式""" fmt = {} font = run.font if font.name: fmt['fontFamily'] = font.name if font.size: fmt['fontSize'] = font.size.pt if font.bold: fmt['bold'] = True if font.italic: fmt['italic'] = True if font.underline and font.underline is not True: fmt['underline'] = str(font.underline) elif font.underline is True: fmt['underline'] = 'single' if font.strike: fmt['strikeThrough'] = True if font.color and font.color.rgb: fmt['color'] = str(font.color.rgb) if font.superscript: fmt['verticalAlign'] = 'superscript' elif font.subscript: fmt['verticalAlign'] = 'subscript' return fmt def _detect_paragraph_type(paragraph) -> str: """检测段落类型""" style_name = paragraph.style.name if paragraph.style else '' if style_name.startswith('Heading') or style_name.startswith('heading'): level = re.search(r'\d+', style_name) if level: return f'heading{level.group()}' return 'heading1' if style_name.startswith('toc ') or style_name.startswith('TOC'): level = re.search(r'\d+', style_name) lvl = level.group() if level else '1' return f'toc{lvl}' if style_name.startswith('List'): return 'list_item' # 通过格式推断标题 text = paragraph.text.strip() if text and paragraph.runs: first_run = paragraph.runs[0] if first_run.font.bold and first_run.font.size: size_pt = first_run.font.size.pt if size_pt >= 18: return 'heading1' elif size_pt >= 16: return 'heading2' elif size_pt >= 14: if re.match(r'^\d+(\.\d+)*\s', text): dots = text.split()[0].count('.') if dots == 0: return 'heading1' elif dots == 1: return 'heading2' else: return 'heading3' return 'paragraph' def _get_paragraph_images(paragraph, images_map) -> List[Dict]: """检查段落中的内联图片""" inline_images = [] for run in paragraph.runs: run_xml = run._element drawings = run_xml.findall(qn('w:drawing')) for drawing in drawings: blips = drawing.findall('.//' + qn('a:blip')) for blip in blips: embed = blip.get(qn('r:embed')) if embed and embed in images_map: extent = drawing.find('.//' + qn('wp:extent')) width = height = None if extent is not None: cx = extent.get('cx') cy = extent.get('cy') if cx: width = int(cx) / 914400 if cy: height = int(cy) / 914400 inline_images.append({ 'rId': embed, 'src': images_map[embed], 'widthInch': round(width, 2) if width else None, 'heightInch': round(height, 2) if height else None, }) return inline_images def _parse_paragraph(paragraph, block_id: int, images_map: Dict) -> Dict: """解析段落""" para_type = _detect_paragraph_type(paragraph) # 段落样式 style_info = {} pf = paragraph.paragraph_format alignment = _get_alignment(paragraph) if alignment: style_info['alignment'] = alignment if pf.left_indent: style_info['indentLeft'] = int(pf.left_indent) if pf.first_line_indent: val = int(pf.first_line_indent) if val > 0: style_info['indentFirstLine'] = val # 内联图片 inline_imgs = _get_paragraph_images(paragraph, images_map) # 提取runs runs = [] for r in paragraph.runs: run_text = r.text if not run_text: continue run_data = {'text': run_text} fmt = _get_run_format(r) if fmt: run_data.update(fmt) runs.append(run_data) block = { 'id': f'b{block_id}', 'type': para_type, } if runs: block['runs'] = runs if style_info: block['style'] = style_info if inline_imgs: block['images'] = inline_imgs if not runs and not inline_imgs: block['runs'] = [{'text': ''}] return block def _parse_table(table, block_id: int) -> Dict: """解析表格 注意:python-docx 对于水平合并单元格,row.cells 会返回重复的 cell 对象引用。 例如:A 单元格合并了 3 列,row.cells 会返回 [A, A, A, B, C, ...] 需要通过跟踪 cell._tc 元素来去重,避免重复添加同一个单元格。 """ rows_data = [] for row in table.rows: cells_data = [] seen_tc = set() # 跟踪已处理的单元格元素,避免重复 for cell in row.cells: tc = cell._tc # 如果这个单元格元素已经处理过,跳过(合并单元格的重复引用) if id(tc) in seen_tc: continue seen_tc.add(id(tc)) cell_text = cell.text.strip() grid_span = tc.find(qn('w:tcPr')) colspan = 1 if grid_span is not None: gs = grid_span.find(qn('w:gridSpan')) if gs is not None: colspan = int(gs.get(qn('w:val'), 1)) cell_data = { 'text': cell_text, 'colspan': colspan, } cells_data.append(cell_data) rows_data.append(cells_data) return { 'id': f'b{block_id}', 'type': 'table', 'table': { 'rows': len(table.rows), 'cols': len(table.columns), 'data': rows_data, } } def blocks_to_text(blocks: List[Dict]) -> str: """将blocks转为纯文本""" lines = [] for block in blocks: if block['type'] == 'table': table = block.get('table', {}) for row in table.get('data', []): cells = [cell.get('text', '') for cell in row] lines.append(' | '.join(cells)) lines.append('') else: runs = block.get('runs', []) text = ''.join(r.get('text', '') for r in runs) lines.append(text) return '\n'.join(lines) def extract_tables_from_blocks(blocks: List[Dict]) -> List[Dict]: """从blocks中提取所有表格""" tables = [] for i, block in enumerate(blocks): if block['type'] == 'table': tables.append({ 'block_id': block['id'], 'block_index': i, 'table': block.get('table', {}) }) return tables def extract_toc(blocks: List[Dict]) -> List[Dict]: """ 从blocks中提取目录结构(Table of Contents) Returns: [ { "level": 1, "title": "第一章 概述", "block_index": 5, "block_id": "b5" }, ... ] """ toc = [] for i, block in enumerate(blocks): block_type = block.get('type', '') # 检测标题类型 if block_type.startswith('heading'): level = int(block_type.replace('heading', '') or '1') runs = block.get('runs', []) title = ''.join(r.get('text', '') for r in runs).strip() if title: toc.append({ 'level': level, 'title': title, 'block_index': i, 'block_id': block.get('id', f'b{i}') }) # 检测TOC样式 elif block_type.startswith('toc'): level = int(block_type.replace('toc', '') or '1') runs = block.get('runs', []) title = ''.join(r.get('text', '') for r in runs).strip() # TOC条目通常包含页码,去掉尾部数字 title = re.sub(r'\s*\d+\s*$', '', title).strip() if title: toc.append({ 'level': level, 'title': title, 'block_index': i, 'block_id': block.get('id', f'b{i}'), 'is_toc_entry': True # 标记为目录条目 }) return toc def split_by_chapters(blocks: List[Dict], toc: List[Dict] = None) -> List[Dict]: """ 根据目录/标题将文档切分为章节 Args: blocks: 文档块列表 toc: 目录结构(可选,如果不提供则自动提取) Returns: [ { "chapter_id": "ch0", "title": "前言", "level": 0, "start_index": 0, "end_index": 10, "blocks": [...], "text": "章节纯文本内容" }, ... ] """ if toc is None: toc = extract_toc(blocks) # 过滤掉TOC条目,只保留真正的标题 headings = [t for t in toc if not t.get('is_toc_entry')] if not headings: # 没有标题,整个文档作为一个章节 return [{ 'chapter_id': 'ch0', 'title': '全文', 'level': 0, 'start_index': 0, 'end_index': len(blocks), 'blocks': blocks, 'text': blocks_to_text(blocks) }] chapters = [] # 处理第一个标题之前的内容(如封面、摘要等) first_heading_index = headings[0]['block_index'] if first_heading_index > 0: pre_blocks = blocks[:first_heading_index] chapters.append({ 'chapter_id': 'ch0', 'title': '前言', 'level': 0, 'start_index': 0, 'end_index': first_heading_index, 'blocks': pre_blocks, 'text': blocks_to_text(pre_blocks) }) # 按标题切分章节 for i, heading in enumerate(headings): start_index = heading['block_index'] # 确定章节结束位置 if i + 1 < len(headings): end_index = headings[i + 1]['block_index'] else: end_index = len(blocks) chapter_blocks = blocks[start_index:end_index] chapters.append({ 'chapter_id': f'ch{i + 1}', 'title': heading['title'], 'level': heading['level'], 'start_index': start_index, 'end_index': end_index, 'blocks': chapter_blocks, 'text': blocks_to_text(chapter_blocks) }) return chapters def parse_docx_with_chapters(file_content: bytes) -> Dict[str, Any]: """ 解析DOCX文件,包含章节切分 Returns: { "page": {...}, "blocks": [...], "totalBlocks": int, "toc": [...], # 目录结构 "chapters": [...] # 章节列表 } """ # 基础解析 result = parse_docx_file(file_content) # 提取目录 toc = extract_toc(result['blocks']) result['toc'] = toc logger.info(f"提取目录: {len(toc)} 个条目") # 切分章节 chapters = split_by_chapters(result['blocks'], toc) # 不在结果中包含完整blocks,节省内存 for ch in chapters: ch['block_count'] = len(ch['blocks']) ch['text_length'] = len(ch['text']) del ch['blocks'] # 移除blocks,只保留text result['chapters'] = chapters logger.info(f"切分章节: {len(chapters)} 个章节") return result