""" DOCX解析服务 将docx文件解析为结构化JSON,供前端渲染和要素提取使用。 """ import base64 import re from io import BytesIO from typing import Dict, List, Any, Optional import docx from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml.ns import qn from loguru import logger def parse_docx_file(file_content: bytes) -> Dict[str, Any]: """ 解析DOCX文件内容 Args: file_content: DOCX文件的二进制内容 Returns: { "page": {...}, "blocks": [...], "totalBlocks": int } """ doc = Document(BytesIO(file_content)) # 提取图片 images_map = _extract_images(doc) logger.info(f"提取图片: {len(images_map)} 张") # 页面设置 section = doc.sections[0] page_info = { 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210, 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297, 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4, 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4, 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8, 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8, } # 解析文档块 blocks = [] block_id = 0 for block_type, item in _iter_block_items(doc): if block_type == 'paragraph': block = _parse_paragraph(item, block_id, images_map) blocks.append(block) block_id += 1 elif block_type == 'table': block = _parse_table(item, block_id) blocks.append(block) block_id += 1 logger.info(f"解析完成: {len(blocks)} 个块") return { 'page': page_info, 'blocks': blocks, 'totalBlocks': len(blocks), } def _extract_images(doc) -> Dict[str, str]: """提取文档中所有图片""" images = {} for rel_id, rel in doc.part.rels.items(): if "image" in rel.reltype: blob = rel.target_part.blob target = rel.target_ref.lower() if target.endswith('.png'): mime = 'image/png' elif target.endswith('.jpg') or target.endswith('.jpeg'): mime = 'image/jpeg' elif target.endswith('.gif'): mime = 'image/gif' elif target.endswith('.bmp'): mime = 'image/bmp' else: mime = 'image/png' b64 = base64.b64encode(blob).decode('ascii') images[rel_id] = f"data:{mime};base64,{b64}" return images def _iter_block_items(doc): """按文档body中的顺序迭代段落和表格""" body = doc.element.body for child in body: if child.tag == qn('w:p'): yield ('paragraph', docx.text.paragraph.Paragraph(child, doc)) elif child.tag == qn('w:tbl'): yield ('table', docx.table.Table(child, doc)) def _get_alignment(paragraph) -> Optional[str]: """获取段落对齐方式""" align = paragraph.alignment if align is None: pPr = paragraph._element.find(qn('w:pPr')) if pPr is not None: jc = pPr.find(qn('w:jc')) if jc is not None: return jc.get(qn('w:val')) return None align_map = { WD_ALIGN_PARAGRAPH.LEFT: 'left', WD_ALIGN_PARAGRAPH.CENTER: 'center', WD_ALIGN_PARAGRAPH.RIGHT: 'right', WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify', } return align_map.get(align, None) def _get_run_format(run) -> Dict: """提取Run级别格式""" fmt = {} font = run.font if font.name: fmt['fontFamily'] = font.name if font.size: fmt['fontSize'] = font.size.pt if font.bold: fmt['bold'] = True if font.italic: fmt['italic'] = True if font.underline and font.underline is not True: fmt['underline'] = str(font.underline) elif font.underline is True: fmt['underline'] = 'single' if font.strike: fmt['strikeThrough'] = True if font.color and font.color.rgb: fmt['color'] = str(font.color.rgb) if font.superscript: fmt['verticalAlign'] = 'superscript' elif font.subscript: fmt['verticalAlign'] = 'subscript' return fmt def _detect_paragraph_type(paragraph) -> str: """检测段落类型""" style_name = paragraph.style.name if paragraph.style else '' if style_name.startswith('Heading') or style_name.startswith('heading'): level = re.search(r'\d+', style_name) if level: return f'heading{level.group()}' return 'heading1' if style_name.startswith('toc ') or style_name.startswith('TOC'): level = re.search(r'\d+', style_name) lvl = level.group() if level else '1' return f'toc{lvl}' if style_name.startswith('List'): return 'list_item' # 通过格式推断标题 text = paragraph.text.strip() if text and paragraph.runs: first_run = paragraph.runs[0] if first_run.font.bold and first_run.font.size: size_pt = first_run.font.size.pt if size_pt >= 18: return 'heading1' elif size_pt >= 16: return 'heading2' elif size_pt >= 14: if re.match(r'^\d+(\.\d+)*\s', text): dots = text.split()[0].count('.') if dots == 0: return 'heading1' elif dots == 1: return 'heading2' else: return 'heading3' return 'paragraph' def _get_paragraph_images(paragraph, images_map) -> List[Dict]: """检查段落中的内联图片""" inline_images = [] for run in paragraph.runs: run_xml = run._element drawings = run_xml.findall(qn('w:drawing')) for drawing in drawings: blips = drawing.findall('.//' + qn('a:blip')) for blip in blips: embed = blip.get(qn('r:embed')) if embed and embed in images_map: extent = drawing.find('.//' + qn('wp:extent')) width = height = None if extent is not None: cx = extent.get('cx') cy = extent.get('cy') if cx: width = int(cx) / 914400 if cy: height = int(cy) / 914400 inline_images.append({ 'rId': embed, 'src': images_map[embed], 'widthInch': round(width, 2) if width else None, 'heightInch': round(height, 2) if height else None, }) return inline_images def _parse_paragraph(paragraph, block_id: int, images_map: Dict) -> Dict: """解析段落""" para_type = _detect_paragraph_type(paragraph) # 段落样式 style_info = {} pf = paragraph.paragraph_format alignment = _get_alignment(paragraph) if alignment: style_info['alignment'] = alignment if pf.left_indent: style_info['indentLeft'] = int(pf.left_indent) if pf.first_line_indent: val = int(pf.first_line_indent) if val > 0: style_info['indentFirstLine'] = val # 内联图片 inline_imgs = _get_paragraph_images(paragraph, images_map) # 提取runs runs = [] for r in paragraph.runs: run_text = r.text if not run_text: continue run_data = {'text': run_text} fmt = _get_run_format(r) if fmt: run_data.update(fmt) runs.append(run_data) block = { 'id': f'b{block_id}', 'type': para_type, } if runs: block['runs'] = runs if style_info: block['style'] = style_info if inline_imgs: block['images'] = inline_imgs if not runs and not inline_imgs: block['runs'] = [{'text': ''}] return block def _parse_table(table, block_id: int) -> Dict: """解析表格""" rows_data = [] for row in table.rows: cells_data = [] for cell in row.cells: cell_text = cell.text.strip() tc = cell._tc grid_span = tc.find(qn('w:tcPr')) colspan = 1 if grid_span is not None: gs = grid_span.find(qn('w:gridSpan')) if gs is not None: colspan = int(gs.get(qn('w:val'), 1)) cell_data = { 'text': cell_text, 'colspan': colspan, } cells_data.append(cell_data) rows_data.append(cells_data) return { 'id': f'b{block_id}', 'type': 'table', 'table': { 'rows': len(table.rows), 'cols': len(table.columns), 'data': rows_data, } } def blocks_to_text(blocks: List[Dict]) -> str: """将blocks转为纯文本""" lines = [] for block in blocks: if block['type'] == 'table': table = block.get('table', {}) for row in table.get('data', []): cells = [cell.get('text', '') for cell in row] lines.append(' | '.join(cells)) lines.append('') else: runs = block.get('runs', []) text = ''.join(r.get('text', '') for r in runs) lines.append(text) return '\n'.join(lines) def extract_tables_from_blocks(blocks: List[Dict]) -> List[Dict]: """从blocks中提取所有表格""" tables = [] for i, block in enumerate(blocks): if block['type'] == 'table': tables.append({ 'block_id': block['id'], 'block_index': i, 'table': block.get('table', {}) }) return tables