resource_monitor.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. """
  2. 资源监控采集器模块
  3. 在OCR任务期间,后台线程定期采集GPU和系统负载数据
  4. """
  5. import threading
  6. import time
  7. import subprocess
  8. import logging
  9. import os
  10. from typing import Optional, Dict, Any, List
  11. logger = logging.getLogger(__name__)
  12. class ResourceMonitor:
  13. """资源监控采集器,在后台线程中定期采集GPU和系统负载数据"""
  14. def __init__(self, interval: float = 0.5):
  15. """
  16. 初始化资源监控采集器
  17. Args:
  18. interval: 采集间隔(秒),默认0.5秒
  19. """
  20. self.interval = interval
  21. self.monitoring = False
  22. self.monitor_thread: Optional[threading.Thread] = None
  23. self.samples: List[Dict[str, Any]] = []
  24. self.lock = threading.Lock()
  25. def start(self):
  26. """启动监控采集"""
  27. if self.monitoring:
  28. logger.warning("资源监控已在运行中")
  29. return
  30. self.monitoring = True
  31. self.samples.clear()
  32. self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
  33. self.monitor_thread.start()
  34. logger.info(f"资源监控采集器已启动,采集间隔: {self.interval}秒")
  35. def stop(self):
  36. """停止监控采集"""
  37. if not self.monitoring:
  38. logger.warning("资源监控未在运行")
  39. return
  40. self.monitoring = False
  41. if self.monitor_thread:
  42. self.monitor_thread.join(timeout=2.0)
  43. logger.info(f"资源监控采集器已停止,共采集 {len(self.samples)} 个样本")
  44. def _monitor_loop(self):
  45. """监控循环,定期采集数据"""
  46. while self.monitoring:
  47. try:
  48. sample = self._collect_sample()
  49. if sample:
  50. with self.lock:
  51. self.samples.append(sample)
  52. except Exception as e:
  53. logger.warning(f"采集资源数据时出错: {e}")
  54. time.sleep(self.interval)
  55. def _collect_sample(self) -> Optional[Dict[str, Any]]:
  56. """
  57. 采集一次资源数据样本
  58. Returns:
  59. 包含GPU和系统负载信息的字典,如果采集失败返回None
  60. """
  61. sample = {
  62. "timestamp": time.time(),
  63. "gpu_info": self._get_gpu_info(),
  64. "system_load": self._get_system_load()
  65. }
  66. return sample
  67. def _get_gpu_info(self) -> Optional[Dict[str, Any]]:
  68. """获取GPU信息"""
  69. try:
  70. cmd = [
  71. "nvidia-smi",
  72. "--query-gpu=index,name,memory.total,memory.used,utilization.gpu",
  73. "--format=csv,noheader,nounits"
  74. ]
  75. result = subprocess.run(
  76. cmd,
  77. capture_output=True,
  78. text=True,
  79. timeout=2,
  80. check=False
  81. )
  82. if result.returncode != 0:
  83. return None
  84. lines = result.stdout.strip().split('\n')
  85. if not lines or not lines[0]:
  86. return None
  87. parts = [p.strip() for p in lines[0].split(',')]
  88. if len(parts) < 5:
  89. return None
  90. gpu_index = int(parts[0])
  91. gpu_name = parts[1]
  92. memory_total_mb = int(parts[2])
  93. memory_used_mb = int(parts[3])
  94. utilization = float(parts[4])
  95. return {
  96. "gpu_index": gpu_index,
  97. "gpu_name": gpu_name,
  98. "gpu_memory_total": memory_total_mb * 1024 * 1024, # 转换为字节
  99. "gpu_memory_used": memory_used_mb * 1024 * 1024, # 转换为字节
  100. "gpu_utilization": utilization
  101. }
  102. except Exception as e:
  103. logger.debug(f"获取GPU信息失败: {e}")
  104. return None
  105. def _get_system_load(self) -> Optional[Dict[str, float]]:
  106. """获取系统负载"""
  107. try:
  108. # Linux系统使用os.getloadavg()
  109. if hasattr(os, 'getloadavg'):
  110. load_avg = os.getloadavg()
  111. return {
  112. "load_1min": load_avg[0],
  113. "load_5min": load_avg[1],
  114. "load_15min": load_avg[2]
  115. }
  116. except Exception as e:
  117. logger.debug(f"获取系统负载失败: {e}")
  118. return None
  119. def get_statistics(self) -> Optional[Dict[str, Any]]:
  120. """
  121. 对采集的数据进行统计分析
  122. Returns:
  123. 统计结果,包含:
  124. - gpu_index: GPU索引
  125. - gpu_name: GPU名称
  126. - gpu_memory_total: 总显存(字节)
  127. - gpu_memory_used: 期间最大显存使用量(字节),任务期间采集到的最大显存使用
  128. - gpu_memory_used_avg: 平均显存使用(字节)
  129. - gpu_memory_used_max: 最大显存使用(字节)
  130. - gpu_utilization_avg: 平均GPU利用率(%)
  131. - gpu_utilization_max: 最大GPU利用率(%)
  132. - system_load_avg_1min: 平均1分钟系统负载
  133. - system_load_max_1min: 最大1分钟系统负载
  134. - sample_count: 采集的样本数量
  135. - duration: 监控持续时间(秒)
  136. """
  137. with self.lock:
  138. if not self.samples:
  139. logger.warning("没有采集到任何数据样本")
  140. return None
  141. # 提取GPU信息
  142. gpu_samples = [s["gpu_info"] for s in self.samples if s.get("gpu_info")]
  143. if not gpu_samples:
  144. logger.warning("没有采集到GPU数据")
  145. return None
  146. # 提取系统负载信息
  147. load_samples = [s["system_load"] for s in self.samples if s.get("system_load")]
  148. # 计算GPU统计信息
  149. first_gpu = gpu_samples[0]
  150. last_gpu = gpu_samples[-1]
  151. # 计算平均值和最大值(用于统计)
  152. memory_values = [g.get("gpu_memory_used", 0) for g in gpu_samples]
  153. utilization_values = [g.get("gpu_utilization", 0) for g in gpu_samples]
  154. memory_avg = sum(memory_values) / len(memory_values) if memory_values else 0
  155. memory_max = max(memory_values) if memory_values else 0
  156. utilization_avg = sum(utilization_values) / len(utilization_values) if utilization_values else 0
  157. utilization_max = max(utilization_values) if utilization_values else 0
  158. # 使用期间最大显存值(不再计算增量)
  159. # 注意:这是采集期间的最大显存使用量,不是增量
  160. gpu_memory_used = int(memory_max)
  161. # 计算系统负载统计(1分钟、5分钟、15分钟)
  162. load_1min_values = [l.get("load_1min", 0) for l in load_samples if l]
  163. load_1min_avg = sum(load_1min_values) / len(load_1min_values) if load_1min_values else None
  164. load_1min_max = max(load_1min_values) if load_1min_values else None
  165. load_5min_values = [l.get("load_5min", 0) for l in load_samples if l]
  166. load_5min_avg = sum(load_5min_values) / len(load_5min_values) if load_5min_values else None
  167. load_5min_max = max(load_5min_values) if load_5min_values else None
  168. load_15min_values = [l.get("load_15min", 0) for l in load_samples if l]
  169. load_15min_avg = sum(load_15min_values) / len(load_15min_values) if load_15min_values else None
  170. load_15min_max = max(load_15min_values) if load_15min_values else None
  171. # 计算持续时间
  172. duration = self.samples[-1]["timestamp"] - self.samples[0]["timestamp"] if len(self.samples) > 1 else 0
  173. result = {
  174. "gpu_index": first_gpu.get("gpu_index"),
  175. "gpu_name": first_gpu.get("gpu_name"),
  176. "gpu_memory_total": first_gpu.get("gpu_memory_total"),
  177. "gpu_memory_used": gpu_memory_used, # 期间最大显存使用量(不是增量)
  178. "gpu_memory_used_avg": int(memory_avg),
  179. "gpu_memory_used_max": int(memory_max),
  180. "gpu_utilization": utilization_avg, # 平均利用率
  181. "gpu_utilization_avg": utilization_avg,
  182. "gpu_utilization_max": utilization_max,
  183. "system_load_avg_1min": load_1min_avg,
  184. "system_load_max_1min": load_1min_max,
  185. "system_load_avg_5min": load_5min_avg,
  186. "system_load_max_5min": load_5min_max,
  187. "system_load_avg_15min": load_15min_avg,
  188. "system_load_max_15min": load_15min_max,
  189. "sample_count": len(self.samples),
  190. "duration": duration
  191. }
  192. logger.info(f"资源统计计算完成 - 样本数: {len(self.samples)}, 持续时间: {duration:.2f}秒, "
  193. f"最大显存使用: {gpu_memory_used / 1024 / 1024:.2f}MB (平均: {memory_avg / 1024 / 1024:.2f}MB), "
  194. f"平均GPU利用率: {utilization_avg:.2f}%, 最大GPU利用率: {utilization_max:.2f}%")
  195. return result