docx_parser.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. """
  2. DOCX解析服务
  3. 将docx文件解析为结构化JSON,供前端渲染和要素提取使用。
  4. """
  5. import base64
  6. import re
  7. from io import BytesIO
  8. from typing import Dict, List, Any, Optional
  9. import docx
  10. from docx import Document
  11. from docx.enum.text import WD_ALIGN_PARAGRAPH
  12. from docx.oxml.ns import qn
  13. from loguru import logger
  14. def parse_docx_file(file_content: bytes) -> Dict[str, Any]:
  15. """
  16. 解析DOCX文件内容
  17. Args:
  18. file_content: DOCX文件的二进制内容
  19. Returns:
  20. {
  21. "page": {...},
  22. "blocks": [...],
  23. "totalBlocks": int
  24. }
  25. """
  26. doc = Document(BytesIO(file_content))
  27. # 提取图片
  28. images_map = _extract_images(doc)
  29. logger.info(f"提取图片: {len(images_map)} 张")
  30. # 页面设置
  31. section = doc.sections[0]
  32. page_info = {
  33. 'widthMm': round(section.page_width.mm, 1) if section.page_width else 210,
  34. 'heightMm': round(section.page_height.mm, 1) if section.page_height else 297,
  35. 'marginTopMm': round(section.top_margin.mm, 1) if section.top_margin else 25.4,
  36. 'marginBottomMm': round(section.bottom_margin.mm, 1) if section.bottom_margin else 25.4,
  37. 'marginLeftMm': round(section.left_margin.mm, 1) if section.left_margin else 31.8,
  38. 'marginRightMm': round(section.right_margin.mm, 1) if section.right_margin else 31.8,
  39. }
  40. # 解析文档块
  41. blocks = []
  42. block_id = 0
  43. for block_type, item in _iter_block_items(doc):
  44. if block_type == 'paragraph':
  45. block = _parse_paragraph(item, block_id, images_map)
  46. blocks.append(block)
  47. block_id += 1
  48. elif block_type == 'table':
  49. block = _parse_table(item, block_id)
  50. blocks.append(block)
  51. block_id += 1
  52. logger.info(f"解析完成: {len(blocks)} 个块")
  53. return {
  54. 'page': page_info,
  55. 'blocks': blocks,
  56. 'totalBlocks': len(blocks),
  57. }
  58. def _extract_images(doc) -> Dict[str, str]:
  59. """提取文档中所有图片"""
  60. images = {}
  61. for rel_id, rel in doc.part.rels.items():
  62. if "image" in rel.reltype:
  63. blob = rel.target_part.blob
  64. target = rel.target_ref.lower()
  65. if target.endswith('.png'):
  66. mime = 'image/png'
  67. elif target.endswith('.jpg') or target.endswith('.jpeg'):
  68. mime = 'image/jpeg'
  69. elif target.endswith('.gif'):
  70. mime = 'image/gif'
  71. elif target.endswith('.bmp'):
  72. mime = 'image/bmp'
  73. else:
  74. mime = 'image/png'
  75. b64 = base64.b64encode(blob).decode('ascii')
  76. images[rel_id] = f"data:{mime};base64,{b64}"
  77. return images
  78. def _iter_block_items(doc):
  79. """按文档body中的顺序迭代段落和表格"""
  80. body = doc.element.body
  81. for child in body:
  82. if child.tag == qn('w:p'):
  83. yield ('paragraph', docx.text.paragraph.Paragraph(child, doc))
  84. elif child.tag == qn('w:tbl'):
  85. yield ('table', docx.table.Table(child, doc))
  86. def _get_alignment(paragraph) -> Optional[str]:
  87. """获取段落对齐方式"""
  88. align = paragraph.alignment
  89. if align is None:
  90. pPr = paragraph._element.find(qn('w:pPr'))
  91. if pPr is not None:
  92. jc = pPr.find(qn('w:jc'))
  93. if jc is not None:
  94. return jc.get(qn('w:val'))
  95. return None
  96. align_map = {
  97. WD_ALIGN_PARAGRAPH.LEFT: 'left',
  98. WD_ALIGN_PARAGRAPH.CENTER: 'center',
  99. WD_ALIGN_PARAGRAPH.RIGHT: 'right',
  100. WD_ALIGN_PARAGRAPH.JUSTIFY: 'justify',
  101. }
  102. return align_map.get(align, None)
  103. def _get_run_format(run) -> Dict:
  104. """提取Run级别格式"""
  105. fmt = {}
  106. font = run.font
  107. if font.name:
  108. fmt['fontFamily'] = font.name
  109. if font.size:
  110. fmt['fontSize'] = font.size.pt
  111. if font.bold:
  112. fmt['bold'] = True
  113. if font.italic:
  114. fmt['italic'] = True
  115. if font.underline and font.underline is not True:
  116. fmt['underline'] = str(font.underline)
  117. elif font.underline is True:
  118. fmt['underline'] = 'single'
  119. if font.strike:
  120. fmt['strikeThrough'] = True
  121. if font.color and font.color.rgb:
  122. fmt['color'] = str(font.color.rgb)
  123. if font.superscript:
  124. fmt['verticalAlign'] = 'superscript'
  125. elif font.subscript:
  126. fmt['verticalAlign'] = 'subscript'
  127. return fmt
  128. def _detect_paragraph_type(paragraph) -> str:
  129. """检测段落类型"""
  130. style_name = paragraph.style.name if paragraph.style else ''
  131. if style_name.startswith('Heading') or style_name.startswith('heading'):
  132. level = re.search(r'\d+', style_name)
  133. if level:
  134. return f'heading{level.group()}'
  135. return 'heading1'
  136. if style_name.startswith('toc ') or style_name.startswith('TOC'):
  137. level = re.search(r'\d+', style_name)
  138. lvl = level.group() if level else '1'
  139. return f'toc{lvl}'
  140. if style_name.startswith('List'):
  141. return 'list_item'
  142. # 通过格式推断标题
  143. text = paragraph.text.strip()
  144. if text and paragraph.runs:
  145. first_run = paragraph.runs[0]
  146. if first_run.font.bold and first_run.font.size:
  147. size_pt = first_run.font.size.pt
  148. if size_pt >= 18:
  149. return 'heading1'
  150. elif size_pt >= 16:
  151. return 'heading2'
  152. elif size_pt >= 14:
  153. if re.match(r'^\d+(\.\d+)*\s', text):
  154. dots = text.split()[0].count('.')
  155. if dots == 0:
  156. return 'heading1'
  157. elif dots == 1:
  158. return 'heading2'
  159. else:
  160. return 'heading3'
  161. return 'paragraph'
  162. def _get_paragraph_images(paragraph, images_map) -> List[Dict]:
  163. """检查段落中的内联图片"""
  164. inline_images = []
  165. for run in paragraph.runs:
  166. run_xml = run._element
  167. drawings = run_xml.findall(qn('w:drawing'))
  168. for drawing in drawings:
  169. blips = drawing.findall('.//' + qn('a:blip'))
  170. for blip in blips:
  171. embed = blip.get(qn('r:embed'))
  172. if embed and embed in images_map:
  173. extent = drawing.find('.//' + qn('wp:extent'))
  174. width = height = None
  175. if extent is not None:
  176. cx = extent.get('cx')
  177. cy = extent.get('cy')
  178. if cx:
  179. width = int(cx) / 914400
  180. if cy:
  181. height = int(cy) / 914400
  182. inline_images.append({
  183. 'rId': embed,
  184. 'src': images_map[embed],
  185. 'widthInch': round(width, 2) if width else None,
  186. 'heightInch': round(height, 2) if height else None,
  187. })
  188. return inline_images
  189. def _parse_paragraph(paragraph, block_id: int, images_map: Dict) -> Dict:
  190. """解析段落"""
  191. para_type = _detect_paragraph_type(paragraph)
  192. # 段落样式
  193. style_info = {}
  194. pf = paragraph.paragraph_format
  195. alignment = _get_alignment(paragraph)
  196. if alignment:
  197. style_info['alignment'] = alignment
  198. if pf.left_indent:
  199. style_info['indentLeft'] = int(pf.left_indent)
  200. if pf.first_line_indent:
  201. val = int(pf.first_line_indent)
  202. if val > 0:
  203. style_info['indentFirstLine'] = val
  204. # 内联图片
  205. inline_imgs = _get_paragraph_images(paragraph, images_map)
  206. # 提取runs
  207. runs = []
  208. for r in paragraph.runs:
  209. run_text = r.text
  210. if not run_text:
  211. continue
  212. run_data = {'text': run_text}
  213. fmt = _get_run_format(r)
  214. if fmt:
  215. run_data.update(fmt)
  216. runs.append(run_data)
  217. block = {
  218. 'id': f'b{block_id}',
  219. 'type': para_type,
  220. }
  221. if runs:
  222. block['runs'] = runs
  223. if style_info:
  224. block['style'] = style_info
  225. if inline_imgs:
  226. block['images'] = inline_imgs
  227. if not runs and not inline_imgs:
  228. block['runs'] = [{'text': ''}]
  229. return block
  230. def _parse_table(table, block_id: int) -> Dict:
  231. """解析表格
  232. 注意:python-docx 对于水平合并单元格,row.cells 会返回重复的 cell 对象引用。
  233. 例如:A 单元格合并了 3 列,row.cells 会返回 [A, A, A, B, C, ...]
  234. 需要通过跟踪 cell._tc 元素来去重,避免重复添加同一个单元格。
  235. """
  236. rows_data = []
  237. for row in table.rows:
  238. cells_data = []
  239. seen_tc = set() # 跟踪已处理的单元格元素,避免重复
  240. for cell in row.cells:
  241. tc = cell._tc
  242. # 如果这个单元格元素已经处理过,跳过(合并单元格的重复引用)
  243. if id(tc) in seen_tc:
  244. continue
  245. seen_tc.add(id(tc))
  246. cell_text = cell.text.strip()
  247. grid_span = tc.find(qn('w:tcPr'))
  248. colspan = 1
  249. if grid_span is not None:
  250. gs = grid_span.find(qn('w:gridSpan'))
  251. if gs is not None:
  252. colspan = int(gs.get(qn('w:val'), 1))
  253. cell_data = {
  254. 'text': cell_text,
  255. 'colspan': colspan,
  256. }
  257. cells_data.append(cell_data)
  258. rows_data.append(cells_data)
  259. return {
  260. 'id': f'b{block_id}',
  261. 'type': 'table',
  262. 'table': {
  263. 'rows': len(table.rows),
  264. 'cols': len(table.columns),
  265. 'data': rows_data,
  266. }
  267. }
  268. def blocks_to_text(blocks: List[Dict]) -> str:
  269. """将blocks转为纯文本"""
  270. lines = []
  271. for block in blocks:
  272. if block['type'] == 'table':
  273. table = block.get('table', {})
  274. for row in table.get('data', []):
  275. cells = [cell.get('text', '') for cell in row]
  276. lines.append(' | '.join(cells))
  277. lines.append('')
  278. else:
  279. runs = block.get('runs', [])
  280. text = ''.join(r.get('text', '') for r in runs)
  281. lines.append(text)
  282. return '\n'.join(lines)
  283. def extract_tables_from_blocks(blocks: List[Dict]) -> List[Dict]:
  284. """从blocks中提取所有表格"""
  285. tables = []
  286. for i, block in enumerate(blocks):
  287. if block['type'] == 'table':
  288. tables.append({
  289. 'block_id': block['id'],
  290. 'block_index': i,
  291. 'table': block.get('table', {})
  292. })
  293. return tables
  294. def extract_toc(blocks: List[Dict]) -> List[Dict]:
  295. """
  296. 从blocks中提取目录结构(Table of Contents)
  297. Returns:
  298. [
  299. {
  300. "level": 1,
  301. "title": "第一章 概述",
  302. "block_index": 5,
  303. "block_id": "b5"
  304. },
  305. ...
  306. ]
  307. """
  308. toc = []
  309. for i, block in enumerate(blocks):
  310. block_type = block.get('type', '')
  311. # 检测标题类型
  312. if block_type.startswith('heading'):
  313. level = int(block_type.replace('heading', '') or '1')
  314. runs = block.get('runs', [])
  315. title = ''.join(r.get('text', '') for r in runs).strip()
  316. if title:
  317. toc.append({
  318. 'level': level,
  319. 'title': title,
  320. 'block_index': i,
  321. 'block_id': block.get('id', f'b{i}')
  322. })
  323. # 检测TOC样式
  324. elif block_type.startswith('toc'):
  325. level = int(block_type.replace('toc', '') or '1')
  326. runs = block.get('runs', [])
  327. title = ''.join(r.get('text', '') for r in runs).strip()
  328. # TOC条目通常包含页码,去掉尾部数字
  329. title = re.sub(r'\s*\d+\s*$', '', title).strip()
  330. if title:
  331. toc.append({
  332. 'level': level,
  333. 'title': title,
  334. 'block_index': i,
  335. 'block_id': block.get('id', f'b{i}'),
  336. 'is_toc_entry': True # 标记为目录条目
  337. })
  338. return toc
  339. def split_by_chapters(blocks: List[Dict], toc: List[Dict] = None) -> List[Dict]:
  340. """
  341. 根据目录/标题将文档切分为章节
  342. Args:
  343. blocks: 文档块列表
  344. toc: 目录结构(可选,如果不提供则自动提取)
  345. Returns:
  346. [
  347. {
  348. "chapter_id": "ch0",
  349. "title": "前言",
  350. "level": 0,
  351. "start_index": 0,
  352. "end_index": 10,
  353. "blocks": [...],
  354. "text": "章节纯文本内容"
  355. },
  356. ...
  357. ]
  358. """
  359. if toc is None:
  360. toc = extract_toc(blocks)
  361. # 过滤掉TOC条目,只保留真正的标题
  362. headings = [t for t in toc if not t.get('is_toc_entry')]
  363. if not headings:
  364. # 没有标题,整个文档作为一个章节
  365. return [{
  366. 'chapter_id': 'ch0',
  367. 'title': '全文',
  368. 'level': 0,
  369. 'start_index': 0,
  370. 'end_index': len(blocks),
  371. 'blocks': blocks,
  372. 'text': blocks_to_text(blocks)
  373. }]
  374. chapters = []
  375. # 处理第一个标题之前的内容(如封面、摘要等)
  376. first_heading_index = headings[0]['block_index']
  377. if first_heading_index > 0:
  378. pre_blocks = blocks[:first_heading_index]
  379. chapters.append({
  380. 'chapter_id': 'ch0',
  381. 'title': '前言',
  382. 'level': 0,
  383. 'start_index': 0,
  384. 'end_index': first_heading_index,
  385. 'blocks': pre_blocks,
  386. 'text': blocks_to_text(pre_blocks)
  387. })
  388. # 按标题切分章节
  389. for i, heading in enumerate(headings):
  390. start_index = heading['block_index']
  391. # 确定章节结束位置
  392. if i + 1 < len(headings):
  393. end_index = headings[i + 1]['block_index']
  394. else:
  395. end_index = len(blocks)
  396. chapter_blocks = blocks[start_index:end_index]
  397. chapters.append({
  398. 'chapter_id': f'ch{i + 1}',
  399. 'title': heading['title'],
  400. 'level': heading['level'],
  401. 'start_index': start_index,
  402. 'end_index': end_index,
  403. 'blocks': chapter_blocks,
  404. 'text': blocks_to_text(chapter_blocks)
  405. })
  406. return chapters
  407. def parse_docx_with_chapters(file_content: bytes) -> Dict[str, Any]:
  408. """
  409. 解析DOCX文件,包含章节切分
  410. Returns:
  411. {
  412. "page": {...},
  413. "blocks": [...],
  414. "totalBlocks": int,
  415. "toc": [...], # 目录结构
  416. "chapters": [...] # 章节列表
  417. }
  418. """
  419. # 基础解析
  420. result = parse_docx_file(file_content)
  421. # 提取目录
  422. toc = extract_toc(result['blocks'])
  423. result['toc'] = toc
  424. logger.info(f"提取目录: {len(toc)} 个条目")
  425. # 切分章节
  426. chapters = split_by_chapters(result['blocks'], toc)
  427. # 不在结果中包含完整blocks,节省内存
  428. for ch in chapters:
  429. ch['block_count'] = len(ch['blocks'])
  430. ch['text_length'] = len(ch['text'])
  431. del ch['blocks'] # 移除blocks,只保留text
  432. result['chapters'] = chapters
  433. logger.info(f"切分章节: {len(chapters)} 个章节")
  434. return result