image_preprocessor.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. """
  2. 图像预处理工具 - 包含去水印等功能
  3. 支持的预处理操作:
  4. - 去水印(颜色过滤法)
  5. - 灰度转换
  6. - 二值化
  7. - 去噪
  8. """
  9. import numpy as np
  10. from pathlib import Path
  11. from typing import Optional, Tuple
  12. from loguru import logger
  13. try:
  14. from PIL import Image
  15. PIL_AVAILABLE = True
  16. except ImportError:
  17. PIL_AVAILABLE = False
  18. logger.warning("[图像预处理] PIL 未安装,部分功能不可用")
  19. try:
  20. import cv2
  21. CV2_AVAILABLE = True
  22. except ImportError:
  23. CV2_AVAILABLE = False
  24. logger.warning("[图像预处理] OpenCV 未安装,部分功能不可用")
  25. def remove_watermark(
  26. image_path: str,
  27. output_path: Optional[str] = None,
  28. light_threshold: int = 200,
  29. saturation_threshold: int = 30,
  30. method: str = "auto"
  31. ) -> str:
  32. """
  33. 去除图片水印
  34. 原理:大多数水印是浅色或半透明的,通过以下方式去除:
  35. 1. 将浅色像素(亮度高、饱和度低)替换为白色
  36. 2. 保留深色文字内容
  37. Args:
  38. image_path: 输入图片路径
  39. output_path: 输出图片路径,默认在原文件名后加 _nowm
  40. light_threshold: 亮度阈值(0-255),高于此值的浅色像素可能是水印
  41. saturation_threshold: 饱和度阈值(0-255),低于此值的低饱和度像素可能是水印
  42. method: 去水印方法
  43. - "auto": 自动选择最佳方法
  44. - "light": 基于亮度的简单方法(快速)
  45. - "hsv": 基于HSV颜色空间的方法(更精确)
  46. - "adaptive": 自适应阈值方法
  47. Returns:
  48. 处理后的图片路径
  49. """
  50. if not CV2_AVAILABLE:
  51. logger.warning("[去水印] OpenCV 未安装,跳过去水印处理")
  52. return image_path
  53. logger.info(f"[去水印] 开始处理: {image_path}")
  54. logger.info(f"[去水印] 方法: {method}, 亮度阈值: {light_threshold}, 饱和度阈值: {saturation_threshold}")
  55. # 读取图片
  56. img = cv2.imread(image_path)
  57. if img is None:
  58. logger.error(f"[去水印] 无法读取图片: {image_path}")
  59. return image_path
  60. original_shape = img.shape
  61. logger.info(f"[去水印] 图片尺寸: {original_shape[1]}x{original_shape[0]}")
  62. # 根据方法选择处理逻辑
  63. if method == "auto":
  64. # 自动检测:先尝试 HSV 方法,如果效果不好则用 adaptive
  65. method = "hsv"
  66. if method == "light":
  67. # 简单亮度方法:将浅色像素替换为白色
  68. result = _remove_watermark_light(img, light_threshold)
  69. elif method == "hsv":
  70. # HSV 方法:基于亮度和饱和度
  71. result = _remove_watermark_hsv(img, light_threshold, saturation_threshold)
  72. elif method == "adaptive":
  73. # 自适应方法:使用自适应阈值
  74. result = _remove_watermark_adaptive(img)
  75. else:
  76. logger.warning(f"[去水印] 未知方法: {method},使用 hsv")
  77. result = _remove_watermark_hsv(img, light_threshold, saturation_threshold)
  78. # 确定输出路径
  79. if output_path is None:
  80. path = Path(image_path)
  81. output_path = str(path.parent / f"{path.stem}_nowm{path.suffix}")
  82. # 保存结果
  83. cv2.imwrite(output_path, result)
  84. logger.info(f"[去水印] 处理完成,保存到: {output_path}")
  85. return output_path
  86. def _remove_watermark_light(img: np.ndarray, threshold: int = 200) -> np.ndarray:
  87. """
  88. 简单亮度方法:将浅色像素替换为白色
  89. 适用于:浅色/灰色水印
  90. """
  91. # 转为灰度图
  92. gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  93. # 创建掩码:亮度高于阈值的区域
  94. mask = gray > threshold
  95. # 将掩码区域设为白色
  96. result = img.copy()
  97. result[mask] = [255, 255, 255]
  98. return result
  99. def _remove_watermark_hsv(
  100. img: np.ndarray,
  101. light_threshold: int = 200,
  102. saturation_threshold: int = 30
  103. ) -> np.ndarray:
  104. """
  105. HSV 方法:基于亮度和饱和度去除水印
  106. 原理:水印通常是高亮度、低饱和度的
  107. 适用于:彩色水印、半透明水印
  108. """
  109. # 转换到 HSV 颜色空间
  110. hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
  111. # 分离通道
  112. h, s, v = cv2.split(hsv)
  113. # 创建水印掩码:高亮度 AND 低饱和度
  114. watermark_mask = (v > light_threshold) & (s < saturation_threshold)
  115. # 将水印区域设为白色
  116. result = img.copy()
  117. result[watermark_mask] = [255, 255, 255]
  118. # 可选:对边缘进行平滑处理
  119. # kernel = np.ones((3, 3), np.uint8)
  120. # watermark_mask_dilated = cv2.dilate(watermark_mask.astype(np.uint8), kernel, iterations=1)
  121. # result[watermark_mask_dilated == 1] = [255, 255, 255]
  122. return result
  123. def _remove_watermark_adaptive(img: np.ndarray) -> np.ndarray:
  124. """
  125. 自适应阈值方法
  126. 适用于:复杂背景、不均匀光照
  127. """
  128. # 转为灰度图
  129. gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  130. # 使用自适应阈值
  131. # 这会根据局部区域计算阈值,保留文字,去除背景和水印
  132. binary = cv2.adaptiveThreshold(
  133. gray, 255,
  134. cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
  135. cv2.THRESH_BINARY,
  136. blockSize=15,
  137. C=10
  138. )
  139. # 转回 BGR(3通道)
  140. result = cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
  141. return result
  142. def enhance_for_ocr(
  143. image_path: str,
  144. output_path: Optional[str] = None,
  145. remove_wm: bool = True,
  146. denoise: bool = True,
  147. sharpen: bool = False
  148. ) -> str:
  149. """
  150. OCR 预处理增强
  151. 组合多种预处理操作,优化 OCR 识别效果
  152. Args:
  153. image_path: 输入图片路径
  154. output_path: 输出图片路径
  155. remove_wm: 是否去除水印
  156. denoise: 是否去噪
  157. sharpen: 是否锐化
  158. Returns:
  159. 处理后的图片路径
  160. """
  161. if not CV2_AVAILABLE:
  162. logger.warning("[OCR预处理] OpenCV 未安装,跳过预处理")
  163. return image_path
  164. logger.info(f"[OCR预处理] 开始处理: {image_path}")
  165. # 读取图片
  166. img = cv2.imread(image_path)
  167. if img is None:
  168. logger.error(f"[OCR预处理] 无法读取图片: {image_path}")
  169. return image_path
  170. result = img.copy()
  171. # 1. 去水印
  172. if remove_wm:
  173. result = _remove_watermark_hsv(result)
  174. logger.info("[OCR预处理] 已去除水印")
  175. # 2. 去噪
  176. if denoise:
  177. result = cv2.fastNlMeansDenoisingColored(result, None, 10, 10, 7, 21)
  178. logger.info("[OCR预处理] 已去噪")
  179. # 3. 锐化
  180. if sharpen:
  181. kernel = np.array([[-1, -1, -1],
  182. [-1, 9, -1],
  183. [-1, -1, -1]])
  184. result = cv2.filter2D(result, -1, kernel)
  185. logger.info("[OCR预处理] 已锐化")
  186. # 确定输出路径
  187. if output_path is None:
  188. path = Path(image_path)
  189. output_path = str(path.parent / f"{path.stem}_enhanced{path.suffix}")
  190. # 保存结果
  191. cv2.imwrite(output_path, result)
  192. logger.info(f"[OCR预处理] 处理完成,保存到: {output_path}")
  193. return output_path
  194. def check_opencv_available() -> bool:
  195. """检查 OpenCV 是否可用"""
  196. return CV2_AVAILABLE
  197. def crop_header_footer(
  198. image_path: str,
  199. output_path: Optional[str] = None,
  200. header_ratio: float = 0.05,
  201. footer_ratio: float = 0.05,
  202. auto_detect: bool = False
  203. ) -> str:
  204. """
  205. 裁剪图片的页眉和页脚区域
  206. 通过按比例裁剪图片顶部和底部来去除页眉页脚
  207. Args:
  208. image_path: 输入图片路径
  209. output_path: 输出图片路径,默认在原文件名后加 _cropped
  210. header_ratio: 页眉裁剪比例(0-1),默认0.05表示裁剪顶部5%
  211. footer_ratio: 页脚裁剪比例(0-1),默认0.05表示裁剪底部5%
  212. auto_detect: 是否自动检测页眉页脚边界(忽略 header_ratio 和 footer_ratio)
  213. Returns:
  214. 处理后的图片路径
  215. """
  216. if not CV2_AVAILABLE:
  217. logger.warning("[裁剪页眉页脚] OpenCV 未安装,跳过处理")
  218. return image_path
  219. logger.info(f"[裁剪页眉页脚] 开始处理: {image_path}")
  220. # 读取图片
  221. img = cv2.imread(image_path)
  222. if img is None:
  223. logger.error(f"[裁剪页眉页脚] 无法读取图片: {image_path}")
  224. return image_path
  225. height, width = img.shape[:2]
  226. logger.info(f"[裁剪页眉页脚] 原始尺寸: {width}x{height}")
  227. if auto_detect:
  228. # 自动检测页眉页脚边界
  229. logger.info("[裁剪页眉页脚] 使用自动检测模式")
  230. header_pixels, footer_pixels = _detect_header_footer_boundaries(img)
  231. logger.info(f"[裁剪页眉页脚] 自动检测结果: 页眉={header_pixels}px, 页脚={footer_pixels}px")
  232. else:
  233. # 使用固定比例
  234. logger.info(f"[裁剪页眉页脚] 使用固定比例: 页眉={header_ratio}, 页脚={footer_ratio}")
  235. header_pixels = int(height * header_ratio)
  236. footer_pixels = int(height * footer_ratio)
  237. # 裁剪图片(保留中间部分)
  238. top = header_pixels
  239. bottom = height - footer_pixels
  240. if top >= bottom:
  241. logger.warning("[裁剪页眉页脚] 裁剪区域无效,跳过处理")
  242. return image_path
  243. result = img[top:bottom, :]
  244. new_height = result.shape[0]
  245. logger.info(f"[裁剪页眉页脚] 裁剪后尺寸: {width}x{new_height}")
  246. logger.info(f"[裁剪页眉页脚] 裁剪了顶部 {header_pixels}px,底部 {footer_pixels}px")
  247. # 确定输出路径
  248. if output_path is None:
  249. path = Path(image_path)
  250. output_path = str(path.parent / f"{path.stem}_cropped{path.suffix}")
  251. # 保存结果
  252. cv2.imwrite(output_path, result)
  253. logger.info(f"[裁剪页眉页脚] 处理完成,保存到: {output_path}")
  254. return output_path
  255. def _detect_header_footer_boundaries(img: np.ndarray) -> Tuple[int, int]:
  256. """
  257. 自动检测页眉页脚边界
  258. 使用多种方法综合判断:
  259. 1. 水平线检测 - 检测分隔线
  260. 2. 文本密度分析 - 页眉页脚通常文字较少
  261. 3. 空白区域检测 - 检测大面积空白
  262. Args:
  263. img: 输入图片(BGR格式)
  264. Returns:
  265. (header_pixels, footer_pixels): 页眉和页脚的像素高度
  266. """
  267. height, width = img.shape[:2]
  268. # 转为灰度图
  269. gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  270. # 定义搜索范围(页眉页脚通常在顶部/底部 15% 以内)
  271. search_range = int(height * 0.15)
  272. min_margin = int(height * 0.02) # 最小边距 2%
  273. # 方法1: 检测水平线
  274. header_line = _find_horizontal_line(gray, 0, search_range, from_top=True)
  275. footer_line = _find_horizontal_line(gray, height - search_range, height, from_top=False)
  276. # 方法2: 分析文本密度变化
  277. header_density = _find_content_boundary(gray, 0, search_range, from_top=True)
  278. footer_density = _find_content_boundary(gray, height - search_range, height, from_top=False)
  279. # 综合判断:取最可靠的结果
  280. # 优先使用水平线检测结果,其次使用密度分析结果
  281. if header_line > min_margin:
  282. header_pixels = header_line
  283. logger.debug(f"[自动检测] 页眉: 使用水平线检测结果 {header_pixels}px")
  284. elif header_density > min_margin:
  285. header_pixels = header_density
  286. logger.debug(f"[自动检测] 页眉: 使用密度分析结果 {header_pixels}px")
  287. else:
  288. header_pixels = min_margin
  289. logger.debug(f"[自动检测] 页眉: 使用最小边距 {header_pixels}px")
  290. if footer_line > min_margin:
  291. footer_pixels = footer_line
  292. logger.debug(f"[自动检测] 页脚: 使用水平线检测结果 {footer_pixels}px")
  293. elif footer_density > min_margin:
  294. footer_pixels = footer_density
  295. logger.debug(f"[自动检测] 页脚: 使用密度分析结果 {footer_pixels}px")
  296. else:
  297. footer_pixels = min_margin
  298. logger.debug(f"[自动检测] 页脚: 使用最小边距 {footer_pixels}px")
  299. return header_pixels, footer_pixels
  300. def _find_horizontal_line(
  301. gray: np.ndarray,
  302. start_y: int,
  303. end_y: int,
  304. from_top: bool = True
  305. ) -> int:
  306. """
  307. 在指定区域内查找水平分隔线
  308. Args:
  309. gray: 灰度图
  310. start_y: 搜索起始y坐标
  311. end_y: 搜索结束y坐标
  312. from_top: True表示从上往下找,False表示从下往上找
  313. Returns:
  314. 分隔线位置(像素),如果没找到返回0
  315. """
  316. height, width = gray.shape
  317. # 使用 Canny 边缘检测
  318. edges = cv2.Canny(gray[start_y:end_y, :], 50, 150)
  319. # 使用霍夫变换检测直线
  320. lines = cv2.HoughLinesP(
  321. edges,
  322. rho=1,
  323. theta=np.pi/180,
  324. threshold=int(width * 0.5), # 线长度至少为图片宽度的50%
  325. minLineLength=int(width * 0.4),
  326. maxLineGap=20
  327. )
  328. if lines is None:
  329. return 0
  330. # 筛选水平线(角度接近0或180度)
  331. horizontal_lines = []
  332. for line in lines:
  333. x1, y1, x2, y2 = line[0]
  334. # 计算角度
  335. angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi)
  336. # 水平线角度应该接近 0 或 180
  337. if angle < 5 or angle > 175:
  338. avg_y = (y1 + y2) // 2 + start_y
  339. horizontal_lines.append(avg_y)
  340. if not horizontal_lines:
  341. return 0
  342. # 根据方向返回最合适的线
  343. if from_top:
  344. # 从上往下,返回最下面的水平线(作为页眉下边界)
  345. return max(horizontal_lines)
  346. else:
  347. # 从下往上,返回距离底部的距离
  348. return height - min(horizontal_lines)
  349. def _find_content_boundary(
  350. gray: np.ndarray,
  351. start_y: int,
  352. end_y: int,
  353. from_top: bool = True
  354. ) -> int:
  355. """
  356. 通过分析文本/内容密度找到内容边界
  357. 原理:页眉页脚区域通常是空白或只有少量文字,
  358. 正文区域文字密度较高。通过检测密度突变点来确定边界。
  359. Args:
  360. gray: 灰度图
  361. start_y: 搜索起始y坐标
  362. end_y: 搜索结束y坐标
  363. from_top: True表示从上往下找,False表示从下往上找
  364. Returns:
  365. 内容边界位置(像素),如果没找到返回0
  366. """
  367. height, width = gray.shape
  368. # 二值化
  369. _, binary = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
  370. # 计算每一行的像素密度(黑色像素占比)
  371. row_densities = []
  372. for y in range(start_y, end_y):
  373. row = binary[y, :]
  374. density = np.sum(row > 0) / width
  375. row_densities.append((y, density))
  376. if not row_densities:
  377. return 0
  378. # 使用滑动窗口平滑密度曲线
  379. window_size = 10
  380. smoothed = []
  381. for i in range(len(row_densities)):
  382. start = max(0, i - window_size // 2)
  383. end = min(len(row_densities), i + window_size // 2)
  384. avg_density = sum(d[1] for d in row_densities[start:end]) / (end - start)
  385. smoothed.append((row_densities[i][0], avg_density))
  386. # 找到密度突变点
  387. # 定义阈值:当密度从低于 0.01 变化到高于 0.02 时,认为进入正文区域
  388. low_threshold = 0.005
  389. high_threshold = 0.02
  390. if from_top:
  391. # 从上往下,找到第一个连续高密度区域的起始位置
  392. in_content = False
  393. content_start = 0
  394. consecutive_high = 0
  395. for y, density in smoothed:
  396. if density > high_threshold:
  397. consecutive_high += 1
  398. if consecutive_high >= 5 and not in_content:
  399. # 连续5行高密度,认为进入正文
  400. in_content = True
  401. content_start = y - 5 # 往上回退一点
  402. break
  403. else:
  404. consecutive_high = 0
  405. return max(0, content_start - start_y)
  406. else:
  407. # 从下往上,找到最后一个连续高密度区域的结束位置
  408. in_content = False
  409. content_end = height
  410. consecutive_high = 0
  411. for y, density in reversed(smoothed):
  412. if density > high_threshold:
  413. consecutive_high += 1
  414. if consecutive_high >= 5 and not in_content:
  415. in_content = True
  416. content_end = y + 5
  417. break
  418. else:
  419. consecutive_high = 0
  420. return max(0, height - content_end)