docx_parser.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. """
  2. DOCX解析服务
  3. 将docx文件解析为结构化JSON,供前端渲染和要素提取使用。
  4. """
  5. import base64
  6. import re
  7. from io import BytesIO
  8. from typing import Dict, List, Any, Optional
  9. import docx
  10. from docx import Document
  11. from docx.enum.text import WD_ALIGN_PARAGRAPH
  12. from docx.oxml.ns import qn
  13. from loguru import logger
  14. def parse_docx_file(file_content: bytes) -> Dict[str, Any]:
  15. """
  16. 解析DOCX文件内容
  17. Args:
  18. file_content: DOCX文件的二进制内容
  19. Returns:
  20. {
  21. "page": {...},
  22. "blocks": [...],
  23. "totalBlocks": int
  24. }
  25. """
  26. doc = Document(BytesIO(file_content))
  27. # 提取图片
  28. images_map = _extract_images(doc)
  29. logger.info(f"提取图片: {len(images_map)} 张")
  30. # 页面设置
  31. section = doc.sections[0]
  32. page_info = {
  33. 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210,
  34. 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297,
  35. 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4,
  36. 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4,
  37. 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8,
  38. 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8,
  39. }
  40. # 解析文档块
  41. blocks = []
  42. block_id = 0
  43. for block_type, item in _iter_block_items(doc):
  44. if block_type == 'paragraph':
  45. block = _parse_paragraph(item, block_id, images_map)
  46. blocks.append(block)
  47. block_id += 1
  48. elif block_type == 'table':
  49. block = _parse_table(item, block_id)
  50. blocks.append(block)
  51. block_id += 1
  52. logger.info(f"解析完成: {len(blocks)} 个块")
  53. return {
  54. 'page': page_info,
  55. 'blocks': blocks,
  56. 'totalBlocks': len(blocks),
  57. }
  58. def _extract_images(doc) -> Dict[str, str]:
  59. """提取文档中所有图片"""
  60. images = {}
  61. for rel_id, rel in doc.part.rels.items():
  62. if "image" in rel.reltype:
  63. blob = rel.target_part.blob
  64. target = rel.target_ref.lower()
  65. if target.endswith('.png'):
  66. mime = 'image/png'
  67. elif target.endswith('.jpg') or target.endswith('.jpeg'):
  68. mime = 'image/jpeg'
  69. elif target.endswith('.gif'):
  70. mime = 'image/gif'
  71. elif target.endswith('.bmp'):
  72. mime = 'image/bmp'
  73. else:
  74. mime = 'image/png'
  75. b64 = base64.b64encode(blob).decode('ascii')
  76. images[rel_id] = f"data:{mime};base64,{b64}"
  77. return images
  78. def _iter_block_items(doc):
  79. """按文档body中的顺序迭代段落和表格"""
  80. body = doc.element.body
  81. for child in body:
  82. if child.tag == qn('w:p'):
  83. yield ('paragraph', docx.text.paragraph.Paragraph(child, doc))
  84. elif child.tag == qn('w:tbl'):
  85. yield ('table', docx.table.Table(child, doc))
  86. def _get_alignment(paragraph) -> Optional[str]:
  87. """获取段落对齐方式"""
  88. align = paragraph.alignment
  89. if align is None:
  90. pPr = paragraph._element.find(qn('w:pPr'))
  91. if pPr is not None:
  92. jc = pPr.find(qn('w:jc'))
  93. if jc is not None:
  94. return jc.get(qn('w:val'))
  95. return None
  96. align_map = {
  97. WD_ALIGN_PARAGRAPH.LEFT: 'left',
  98. WD_ALIGN_PARAGRAPH.CENTER: 'center',
  99. WD_ALIGN_PARAGRAPH.RIGHT: 'right',
  100. WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify',
  101. }
  102. return align_map.get(align, None)
  103. def _get_run_format(run) -> Dict:
  104. """提取Run级别格式"""
  105. fmt = {}
  106. font = run.font
  107. if font.name:
  108. fmt['fontFamily'] = font.name
  109. if font.size:
  110. fmt['fontSize'] = font.size.pt
  111. if font.bold:
  112. fmt['bold'] = True
  113. if font.italic:
  114. fmt['italic'] = True
  115. if font.underline and font.underline is not True:
  116. fmt['underline'] = str(font.underline)
  117. elif font.underline is True:
  118. fmt['underline'] = 'single'
  119. if font.strike:
  120. fmt['strikeThrough'] = True
  121. if font.color and font.color.rgb:
  122. fmt['color'] = str(font.color.rgb)
  123. if font.superscript:
  124. fmt['verticalAlign'] = 'superscript'
  125. elif font.subscript:
  126. fmt['verticalAlign'] = 'subscript'
  127. return fmt
  128. def _detect_paragraph_type(paragraph) -> str:
  129. """检测段落类型"""
  130. style_name = paragraph.style.name if paragraph.style else ''
  131. if style_name.startswith('Heading') or style_name.startswith('heading'):
  132. level = re.search(r'\d+', style_name)
  133. if level:
  134. return f'heading{level.group()}'
  135. return 'heading1'
  136. if style_name.startswith('toc ') or style_name.startswith('TOC'):
  137. level = re.search(r'\d+', style_name)
  138. lvl = level.group() if level else '1'
  139. return f'toc{lvl}'
  140. if style_name.startswith('List'):
  141. return 'list_item'
  142. # 通过格式推断标题
  143. text = paragraph.text.strip()
  144. if text and paragraph.runs:
  145. first_run = paragraph.runs[0]
  146. if first_run.font.bold and first_run.font.size:
  147. size_pt = first_run.font.size.pt
  148. if size_pt >= 18:
  149. return 'heading1'
  150. elif size_pt >= 16:
  151. return 'heading2'
  152. elif size_pt >= 14:
  153. if re.match(r'^\d+(\.\d+)*\s', text):
  154. dots = text.split()[0].count('.')
  155. if dots == 0:
  156. return 'heading1'
  157. elif dots == 1:
  158. return 'heading2'
  159. else:
  160. return 'heading3'
  161. return 'paragraph'
  162. def _get_paragraph_images(paragraph, images_map) -> List[Dict]:
  163. """检查段落中的内联图片"""
  164. inline_images = []
  165. for run in paragraph.runs:
  166. run_xml = run._element
  167. drawings = run_xml.findall(qn('w:drawing'))
  168. for drawing in drawings:
  169. blips = drawing.findall('.//' + qn('a:blip'))
  170. for blip in blips:
  171. embed = blip.get(qn('r:embed'))
  172. if embed and embed in images_map:
  173. extent = drawing.find('.//' + qn('wp:extent'))
  174. width = height = None
  175. if extent is not None:
  176. cx = extent.get('cx')
  177. cy = extent.get('cy')
  178. if cx:
  179. width = int(cx) / 914400
  180. if cy:
  181. height = int(cy) / 914400
  182. inline_images.append({
  183. 'rId': embed,
  184. 'src': images_map[embed],
  185. 'widthInch': round(width, 2) if width else None,
  186. 'heightInch': round(height, 2) if height else None,
  187. })
  188. return inline_images
  189. def _parse_paragraph(paragraph, block_id: int, images_map: Dict) -> Dict:
  190. """解析段落"""
  191. para_type = _detect_paragraph_type(paragraph)
  192. # 段落样式
  193. style_info = {}
  194. pf = paragraph.paragraph_format
  195. alignment = _get_alignment(paragraph)
  196. if alignment:
  197. style_info['alignment'] = alignment
  198. if pf.left_indent:
  199. style_info['indentLeft'] = int(pf.left_indent)
  200. if pf.first_line_indent:
  201. val = int(pf.first_line_indent)
  202. if val > 0:
  203. style_info['indentFirstLine'] = val
  204. # 内联图片
  205. inline_imgs = _get_paragraph_images(paragraph, images_map)
  206. # 提取runs
  207. runs = []
  208. for r in paragraph.runs:
  209. run_text = r.text
  210. if not run_text:
  211. continue
  212. run_data = {'text': run_text}
  213. fmt = _get_run_format(r)
  214. if fmt:
  215. run_data.update(fmt)
  216. runs.append(run_data)
  217. block = {
  218. 'id': f'b{block_id}',
  219. 'type': para_type,
  220. }
  221. if runs:
  222. block['runs'] = runs
  223. if style_info:
  224. block['style'] = style_info
  225. if inline_imgs:
  226. block['images'] = inline_imgs
  227. if not runs and not inline_imgs:
  228. block['runs'] = [{'text': ''}]
  229. return block
  230. def _parse_table(table, block_id: int) -> Dict:
  231. """解析表格"""
  232. rows_data = []
  233. for row in table.rows:
  234. cells_data = []
  235. for cell in row.cells:
  236. cell_text = cell.text.strip()
  237. tc = cell._tc
  238. grid_span = tc.find(qn('w:tcPr'))
  239. colspan = 1
  240. if grid_span is not None:
  241. gs = grid_span.find(qn('w:gridSpan'))
  242. if gs is not None:
  243. colspan = int(gs.get(qn('w:val'), 1))
  244. cell_data = {
  245. 'text': cell_text,
  246. 'colspan': colspan,
  247. }
  248. cells_data.append(cell_data)
  249. rows_data.append(cells_data)
  250. return {
  251. 'id': f'b{block_id}',
  252. 'type': 'table',
  253. 'table': {
  254. 'rows': len(table.rows),
  255. 'cols': len(table.columns),
  256. 'data': rows_data,
  257. }
  258. }
  259. def blocks_to_text(blocks: List[Dict]) -> str:
  260. """将blocks转为纯文本"""
  261. lines = []
  262. for block in blocks:
  263. if block['type'] == 'table':
  264. table = block.get('table', {})
  265. for row in table.get('data', []):
  266. cells = [cell.get('text', '') for cell in row]
  267. lines.append(' | '.join(cells))
  268. lines.append('')
  269. else:
  270. runs = block.get('runs', [])
  271. text = ''.join(r.get('text', '') for r in runs)
  272. lines.append(text)
  273. return '\n'.join(lines)
  274. def extract_tables_from_blocks(blocks: List[Dict]) -> List[Dict]:
  275. """从blocks中提取所有表格"""
  276. tables = []
  277. for i, block in enumerate(blocks):
  278. if block['type'] == 'table':
  279. tables.append({
  280. 'block_id': block['id'],
  281. 'block_index': i,
  282. 'table': block.get('table', {})
  283. })
  284. return tables