| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- """
- DOCX解析服务
- 将docx文件解析为结构化JSON,供前端渲染和要素提取使用。
- """
- import base64
- import re
- from io import BytesIO
- from typing import Dict, List, Any, Optional
- import docx
- from docx import Document
- from docx.enum.text import WD_ALIGN_PARAGRAPH
- from docx.oxml.ns import qn
- from loguru import logger
- def parse_docx_file(file_content: bytes) -> Dict[str, Any]:
- """
- 解析DOCX文件内容
-
- Args:
- file_content: DOCX文件的二进制内容
-
- Returns:
- {
- "page": {...},
- "blocks": [...],
- "totalBlocks": int
- }
- """
- doc = Document(BytesIO(file_content))
-
- # 提取图片
- images_map = _extract_images(doc)
- logger.info(f"提取图片: {len(images_map)} 张")
-
- # 页面设置
- section = doc.sections[0]
- page_info = {
- 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210,
- 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297,
- 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4,
- 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4,
- 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8,
- 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8,
- }
-
- # 解析文档块
- blocks = []
- block_id = 0
-
- for block_type, item in _iter_block_items(doc):
- if block_type == 'paragraph':
- block = _parse_paragraph(item, block_id, images_map)
- blocks.append(block)
- block_id += 1
- elif block_type == 'table':
- block = _parse_table(item, block_id)
- blocks.append(block)
- block_id += 1
-
- logger.info(f"解析完成: {len(blocks)} 个块")
-
- return {
- 'page': page_info,
- 'blocks': blocks,
- 'totalBlocks': len(blocks),
- }
- def _extract_images(doc) -> Dict[str, str]:
- """提取文档中所有图片"""
- images = {}
- for rel_id, rel in doc.part.rels.items():
- if "image" in rel.reltype:
- blob = rel.target_part.blob
- target = rel.target_ref.lower()
- if target.endswith('.png'):
- mime = 'image/png'
- elif target.endswith('.jpg') or target.endswith('.jpeg'):
- mime = 'image/jpeg'
- elif target.endswith('.gif'):
- mime = 'image/gif'
- elif target.endswith('.bmp'):
- mime = 'image/bmp'
- else:
- mime = 'image/png'
- b64 = base64.b64encode(blob).decode('ascii')
- images[rel_id] = f"data:{mime};base64,{b64}"
- return images
- def _iter_block_items(doc):
- """按文档body中的顺序迭代段落和表格"""
- body = doc.element.body
- for child in body:
- if child.tag == qn('w:p'):
- yield ('paragraph', docx.text.paragraph.Paragraph(child, doc))
- elif child.tag == qn('w:tbl'):
- yield ('table', docx.table.Table(child, doc))
- def _get_alignment(paragraph) -> Optional[str]:
- """获取段落对齐方式"""
- align = paragraph.alignment
- if align is None:
- pPr = paragraph._element.find(qn('w:pPr'))
- if pPr is not None:
- jc = pPr.find(qn('w:jc'))
- if jc is not None:
- return jc.get(qn('w:val'))
- return None
- align_map = {
- WD_ALIGN_PARAGRAPH.LEFT: 'left',
- WD_ALIGN_PARAGRAPH.CENTER: 'center',
- WD_ALIGN_PARAGRAPH.RIGHT: 'right',
- WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify',
- }
- return align_map.get(align, None)
- def _get_run_format(run) -> Dict:
- """提取Run级别格式"""
- fmt = {}
- font = run.font
-
- if font.name:
- fmt['fontFamily'] = font.name
- if font.size:
- fmt['fontSize'] = font.size.pt
- if font.bold:
- fmt['bold'] = True
- if font.italic:
- fmt['italic'] = True
- if font.underline and font.underline is not True:
- fmt['underline'] = str(font.underline)
- elif font.underline is True:
- fmt['underline'] = 'single'
- if font.strike:
- fmt['strikeThrough'] = True
- if font.color and font.color.rgb:
- fmt['color'] = str(font.color.rgb)
-
- if font.superscript:
- fmt['verticalAlign'] = 'superscript'
- elif font.subscript:
- fmt['verticalAlign'] = 'subscript'
-
- return fmt
- def _detect_paragraph_type(paragraph) -> str:
- """检测段落类型"""
- style_name = paragraph.style.name if paragraph.style else ''
-
- if style_name.startswith('Heading') or style_name.startswith('heading'):
- level = re.search(r'\d+', style_name)
- if level:
- return f'heading{level.group()}'
- return 'heading1'
-
- if style_name.startswith('toc ') or style_name.startswith('TOC'):
- level = re.search(r'\d+', style_name)
- lvl = level.group() if level else '1'
- return f'toc{lvl}'
-
- if style_name.startswith('List'):
- return 'list_item'
-
- # 通过格式推断标题
- text = paragraph.text.strip()
- if text and paragraph.runs:
- first_run = paragraph.runs[0]
- if first_run.font.bold and first_run.font.size:
- size_pt = first_run.font.size.pt
- if size_pt >= 18:
- return 'heading1'
- elif size_pt >= 16:
- return 'heading2'
- elif size_pt >= 14:
- if re.match(r'^\d+(\.\d+)*\s', text):
- dots = text.split()[0].count('.')
- if dots == 0:
- return 'heading1'
- elif dots == 1:
- return 'heading2'
- else:
- return 'heading3'
-
- return 'paragraph'
- def _get_paragraph_images(paragraph, images_map) -> List[Dict]:
- """检查段落中的内联图片"""
- inline_images = []
- for run in paragraph.runs:
- run_xml = run._element
- drawings = run_xml.findall(qn('w:drawing'))
- for drawing in drawings:
- blips = drawing.findall('.//' + qn('a:blip'))
- for blip in blips:
- embed = blip.get(qn('r:embed'))
- if embed and embed in images_map:
- extent = drawing.find('.//' + qn('wp:extent'))
- width = height = None
- if extent is not None:
- cx = extent.get('cx')
- cy = extent.get('cy')
- if cx:
- width = int(cx) / 914400
- if cy:
- height = int(cy) / 914400
-
- inline_images.append({
- 'rId': embed,
- 'src': images_map[embed],
- 'widthInch': round(width, 2) if width else None,
- 'heightInch': round(height, 2) if height else None,
- })
- return inline_images
- def _parse_paragraph(paragraph, block_id: int, images_map: Dict) -> Dict:
- """解析段落"""
- para_type = _detect_paragraph_type(paragraph)
-
- # 段落样式
- style_info = {}
- pf = paragraph.paragraph_format
- alignment = _get_alignment(paragraph)
- if alignment:
- style_info['alignment'] = alignment
- if pf.left_indent:
- style_info['indentLeft'] = int(pf.left_indent)
- if pf.first_line_indent:
- val = int(pf.first_line_indent)
- if val > 0:
- style_info['indentFirstLine'] = val
-
- # 内联图片
- inline_imgs = _get_paragraph_images(paragraph, images_map)
-
- # 提取runs
- runs = []
- for r in paragraph.runs:
- run_text = r.text
- if not run_text:
- continue
- run_data = {'text': run_text}
- fmt = _get_run_format(r)
- if fmt:
- run_data.update(fmt)
- runs.append(run_data)
-
- block = {
- 'id': f'b{block_id}',
- 'type': para_type,
- }
-
- if runs:
- block['runs'] = runs
- if style_info:
- block['style'] = style_info
- if inline_imgs:
- block['images'] = inline_imgs
-
- if not runs and not inline_imgs:
- block['runs'] = [{'text': ''}]
-
- return block
- def _parse_table(table, block_id: int) -> Dict:
- """解析表格
-
- 注意:python-docx 对于水平合并单元格,row.cells 会返回重复的 cell 对象引用。
- 例如:A 单元格合并了 3 列,row.cells 会返回 [A, A, A, B, C, ...]
- 需要通过跟踪 cell._tc 元素来去重,避免重复添加同一个单元格。
- """
- rows_data = []
- for row in table.rows:
- cells_data = []
- seen_tc = set() # 跟踪已处理的单元格元素,避免重复
-
- for cell in row.cells:
- tc = cell._tc
- # 如果这个单元格元素已经处理过,跳过(合并单元格的重复引用)
- if id(tc) in seen_tc:
- continue
- seen_tc.add(id(tc))
-
- cell_text = cell.text.strip()
- grid_span = tc.find(qn('w:tcPr'))
- colspan = 1
- if grid_span is not None:
- gs = grid_span.find(qn('w:gridSpan'))
- if gs is not None:
- colspan = int(gs.get(qn('w:val'), 1))
-
- cell_data = {
- 'text': cell_text,
- 'colspan': colspan,
- }
- cells_data.append(cell_data)
- rows_data.append(cells_data)
-
- return {
- 'id': f'b{block_id}',
- 'type': 'table',
- 'table': {
- 'rows': len(table.rows),
- 'cols': len(table.columns),
- 'data': rows_data,
- }
- }
- def blocks_to_text(blocks: List[Dict]) -> str:
- """将blocks转为纯文本"""
- lines = []
- for block in blocks:
- if block['type'] == 'table':
- table = block.get('table', {})
- for row in table.get('data', []):
- cells = [cell.get('text', '') for cell in row]
- lines.append(' | '.join(cells))
- lines.append('')
- else:
- runs = block.get('runs', [])
- text = ''.join(r.get('text', '') for r in runs)
- lines.append(text)
- return '\n'.join(lines)
- def extract_tables_from_blocks(blocks: List[Dict]) -> List[Dict]:
- """从blocks中提取所有表格"""
- tables = []
- for i, block in enumerate(blocks):
- if block['type'] == 'table':
- tables.append({
- 'block_id': block['id'],
- 'block_index': i,
- 'table': block.get('table', {})
- })
- return tables
- def extract_toc(blocks: List[Dict]) -> List[Dict]:
- """
- 从blocks中提取目录结构(Table of Contents)
-
- Returns:
- [
- {
- "level": 1,
- "title": "第一章 概述",
- "block_index": 5,
- "block_id": "b5"
- },
- ...
- ]
- """
- toc = []
-
- for i, block in enumerate(blocks):
- block_type = block.get('type', '')
-
- # 检测标题类型
- if block_type.startswith('heading'):
- level = int(block_type.replace('heading', '') or '1')
- runs = block.get('runs', [])
- title = ''.join(r.get('text', '') for r in runs).strip()
-
- if title:
- toc.append({
- 'level': level,
- 'title': title,
- 'block_index': i,
- 'block_id': block.get('id', f'b{i}')
- })
-
- # 检测TOC样式
- elif block_type.startswith('toc'):
- level = int(block_type.replace('toc', '') or '1')
- runs = block.get('runs', [])
- title = ''.join(r.get('text', '') for r in runs).strip()
- # TOC条目通常包含页码,去掉尾部数字
- title = re.sub(r'\s*\d+\s*$', '', title).strip()
-
- if title:
- toc.append({
- 'level': level,
- 'title': title,
- 'block_index': i,
- 'block_id': block.get('id', f'b{i}'),
- 'is_toc_entry': True # 标记为目录条目
- })
-
- return toc
- def split_by_chapters(blocks: List[Dict], toc: List[Dict] = None) -> List[Dict]:
- """
- 根据目录/标题将文档切分为章节
-
- Args:
- blocks: 文档块列表
- toc: 目录结构(可选,如果不提供则自动提取)
-
- Returns:
- [
- {
- "chapter_id": "ch0",
- "title": "前言",
- "level": 0,
- "start_index": 0,
- "end_index": 10,
- "blocks": [...],
- "text": "章节纯文本内容"
- },
- ...
- ]
- """
- if toc is None:
- toc = extract_toc(blocks)
-
- # 过滤掉TOC条目,只保留真正的标题
- headings = [t for t in toc if not t.get('is_toc_entry')]
-
- if not headings:
- # 没有标题,整个文档作为一个章节
- return [{
- 'chapter_id': 'ch0',
- 'title': '全文',
- 'level': 0,
- 'start_index': 0,
- 'end_index': len(blocks),
- 'blocks': blocks,
- 'text': blocks_to_text(blocks)
- }]
-
- chapters = []
-
- # 处理第一个标题之前的内容(如封面、摘要等)
- first_heading_index = headings[0]['block_index']
- if first_heading_index > 0:
- pre_blocks = blocks[:first_heading_index]
- chapters.append({
- 'chapter_id': 'ch0',
- 'title': '前言',
- 'level': 0,
- 'start_index': 0,
- 'end_index': first_heading_index,
- 'blocks': pre_blocks,
- 'text': blocks_to_text(pre_blocks)
- })
-
- # 按标题切分章节
- for i, heading in enumerate(headings):
- start_index = heading['block_index']
-
- # 确定章节结束位置
- if i + 1 < len(headings):
- end_index = headings[i + 1]['block_index']
- else:
- end_index = len(blocks)
-
- chapter_blocks = blocks[start_index:end_index]
-
- chapters.append({
- 'chapter_id': f'ch{i + 1}',
- 'title': heading['title'],
- 'level': heading['level'],
- 'start_index': start_index,
- 'end_index': end_index,
- 'blocks': chapter_blocks,
- 'text': blocks_to_text(chapter_blocks)
- })
-
- return chapters
- def parse_docx_with_chapters(file_content: bytes) -> Dict[str, Any]:
- """
- 解析DOCX文件,包含章节切分
-
- Returns:
- {
- "page": {...},
- "blocks": [...],
- "totalBlocks": int,
- "toc": [...], # 目录结构
- "chapters": [...] # 章节列表
- }
- """
- # 基础解析
- result = parse_docx_file(file_content)
-
- # 提取目录
- toc = extract_toc(result['blocks'])
- result['toc'] = toc
- logger.info(f"提取目录: {len(toc)} 个条目")
-
- # 切分章节
- chapters = split_by_chapters(result['blocks'], toc)
- # 不在结果中包含完整blocks,节省内存
- for ch in chapters:
- ch['block_count'] = len(ch['blocks'])
- ch['text_length'] = len(ch['text'])
- del ch['blocks'] # 移除blocks,只保留text
-
- result['chapters'] = chapters
- logger.info(f"切分章节: {len(chapters)} 个章节")
-
- return result
|