| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- """
- 资源监控采集器模块
- 在 OCR 任务期间,后台线程定期采集加速卡(NVIDIA GPU / 华为昇腾 NPU)和系统负载数据。
- 根据运行环境自动选择 nvidia-smi 或 npu-smi 进行采集。
- """
- import threading
- import time
- import logging
- import os
- from typing import Optional, Dict, Any, List
- logger = logging.getLogger(__name__)
- class ResourceMonitor:
- """资源监控采集器,在后台线程中定期采集 GPU/NPU 和系统负载数据"""
-
- def __init__(self, interval: float = 0.5):
- """
- 初始化资源监控采集器
-
- Args:
- interval: 采集间隔(秒),默认0.5秒
- """
- self.interval = interval
- self.monitoring = False
- self.monitor_thread: Optional[threading.Thread] = None
- self.samples: List[Dict[str, Any]] = []
- self.lock = threading.Lock()
-
- def start(self):
- """启动监控采集"""
- if self.monitoring:
- logger.warning("资源监控已在运行中")
- return
-
- self.monitoring = True
- self.samples.clear()
- self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
- self.monitor_thread.start()
- logger.info(f"资源监控采集器已启动,采集间隔: {self.interval}秒")
-
- def stop(self):
- """停止监控采集"""
- if not self.monitoring:
- logger.warning("资源监控未在运行")
- return
-
- self.monitoring = False
- if self.monitor_thread:
- self.monitor_thread.join(timeout=2.0)
- logger.info(f"资源监控采集器已停止,共采集 {len(self.samples)} 个样本")
-
- def _monitor_loop(self):
- """监控循环,定期采集数据"""
- while self.monitoring:
- try:
- sample = self._collect_sample()
- if sample:
- with self.lock:
- self.samples.append(sample)
- except Exception as e:
- logger.warning(f"采集资源数据时出错: {e}")
-
- time.sleep(self.interval)
-
- def _collect_sample(self) -> Optional[Dict[str, Any]]:
- """
- 采集一次资源数据样本
-
- Returns:
- 包含GPU和系统负载信息的字典,如果采集失败返回None
- """
- sample = {
- "timestamp": time.time(),
- "gpu_info": self._get_gpu_info(),
- "system_load": self._get_system_load()
- }
- return sample
-
- def _get_gpu_info(self) -> Optional[Dict[str, Any]]:
- """获取加速卡信息(根据环境自动使用 nvidia-smi 或 npu-smi)"""
- try:
- from .gpu_monitor import get_gpu_info
- return get_gpu_info()
- except Exception as e:
- logger.debug(f"获取加速卡信息失败: {e}")
- return None
-
- def _get_system_load(self) -> Optional[Dict[str, float]]:
- """获取系统负载"""
- try:
- # Linux系统使用os.getloadavg()
- if hasattr(os, 'getloadavg'):
- load_avg = os.getloadavg()
- return {
- "load_1min": load_avg[0],
- "load_5min": load_avg[1],
- "load_15min": load_avg[2]
- }
- except Exception as e:
- logger.debug(f"获取系统负载失败: {e}")
- return None
-
- def get_statistics(self) -> Optional[Dict[str, Any]]:
- """
- 对采集的数据进行统计分析
-
- Returns:
- 统计结果,包含:
- - gpu_index: GPU索引
- - gpu_name: GPU名称
- - gpu_memory_total: 总显存(字节)
- - gpu_memory_used: 期间最大显存使用量(字节),任务期间采集到的最大显存使用
- - gpu_memory_used_avg: 平均显存使用(字节)
- - gpu_memory_used_max: 最大显存使用(字节)
- - gpu_utilization_avg: 平均GPU利用率(%)
- - gpu_utilization_max: 最大GPU利用率(%)
- - system_load_avg_1min: 平均1分钟系统负载
- - system_load_max_1min: 最大1分钟系统负载
- - sample_count: 采集的样本数量
- - duration: 监控持续时间(秒)
- """
- with self.lock:
- if not self.samples:
- logger.warning("没有采集到任何数据样本")
- return None
-
- # 提取GPU信息
- gpu_samples = [s["gpu_info"] for s in self.samples if s.get("gpu_info")]
- if not gpu_samples:
- logger.warning("没有采集到GPU数据")
- return None
-
- # 提取系统负载信息
- load_samples = [s["system_load"] for s in self.samples if s.get("system_load")]
-
- # 计算GPU统计信息
- first_gpu = gpu_samples[0]
- last_gpu = gpu_samples[-1]
-
- # 计算平均值和最大值(用于统计)
- memory_values = [g.get("gpu_memory_used", 0) for g in gpu_samples]
- utilization_values = [g.get("gpu_utilization", 0) for g in gpu_samples]
-
- memory_avg = sum(memory_values) / len(memory_values) if memory_values else 0
- memory_max = max(memory_values) if memory_values else 0
- utilization_avg = sum(utilization_values) / len(utilization_values) if utilization_values else 0
- utilization_max = max(utilization_values) if utilization_values else 0
-
- # 使用期间最大显存值(不再计算增量)
- # 注意:这是采集期间的最大显存使用量,不是增量
- gpu_memory_used = int(memory_max)
-
- # 计算系统负载统计(1分钟、5分钟、15分钟)
- load_1min_values = [l.get("load_1min", 0) for l in load_samples if l]
- load_1min_avg = sum(load_1min_values) / len(load_1min_values) if load_1min_values else None
- load_1min_max = max(load_1min_values) if load_1min_values else None
-
- load_5min_values = [l.get("load_5min", 0) for l in load_samples if l]
- load_5min_avg = sum(load_5min_values) / len(load_5min_values) if load_5min_values else None
- load_5min_max = max(load_5min_values) if load_5min_values else None
-
- load_15min_values = [l.get("load_15min", 0) for l in load_samples if l]
- load_15min_avg = sum(load_15min_values) / len(load_15min_values) if load_15min_values else None
- load_15min_max = max(load_15min_values) if load_15min_values else None
-
- # 计算持续时间
- duration = self.samples[-1]["timestamp"] - self.samples[0]["timestamp"] if len(self.samples) > 1 else 0
-
- result = {
- "gpu_index": first_gpu.get("gpu_index"),
- "gpu_name": first_gpu.get("gpu_name"),
- "gpu_memory_total": first_gpu.get("gpu_memory_total"),
- "gpu_memory_used": gpu_memory_used, # 期间最大显存使用量(不是增量)
- "gpu_memory_used_avg": int(memory_avg),
- "gpu_memory_used_max": int(memory_max),
- "gpu_utilization": utilization_avg, # 平均利用率
- "gpu_utilization_avg": utilization_avg,
- "gpu_utilization_max": utilization_max,
- "system_load_avg_1min": load_1min_avg,
- "system_load_max_1min": load_1min_max,
- "system_load_avg_5min": load_5min_avg,
- "system_load_max_5min": load_5min_max,
- "system_load_avg_15min": load_15min_avg,
- "system_load_max_15min": load_15min_max,
- "sample_count": len(self.samples),
- "duration": duration
- }
-
- logger.info(f"资源统计计算完成 - 样本数: {len(self.samples)}, 持续时间: {duration:.2f}秒, "
- f"最大显存使用: {gpu_memory_used / 1024 / 1024:.2f}MB (平均: {memory_avg / 1024 / 1024:.2f}MB), "
- f"平均GPU利用率: {utilization_avg:.2f}%, 最大GPU利用率: {utilization_max:.2f}%")
-
- return result
|