# gpu_parallel.py # -*- coding: utf-8 -*- """ Day 20: GPU 并行推理优化模块 使用 CUDA Stream 让盲道检测和障碍物检测并行执行 """ import os import time import torch import numpy as np from typing import Tuple, Optional, List, Any from concurrent.futures import ThreadPoolExecutor import logging logger = logging.getLogger(__name__) # 全局 CUDA Stream(延迟初始化) _cuda_streams = None _parallel_executor = None def _init_cuda_streams(): """初始化 CUDA Streams""" global _cuda_streams if _cuda_streams is None and torch.cuda.is_available(): try: _cuda_streams = [torch.cuda.Stream() for _ in range(2)] logger.info("[GPU_PARALLEL] 已创建 2 个 CUDA Stream") except Exception as e: logger.warning(f"[GPU_PARALLEL] 创建 CUDA Stream 失败: {e}") _cuda_streams = [] return _cuda_streams def _init_parallel_executor(): """初始化并行执行器(用于 CPU 后处理)""" global _parallel_executor if _parallel_executor is None: _parallel_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="gpu_post") logger.info("[GPU_PARALLEL] 已创建 GPU 后处理线程池") return _parallel_executor class ParallelDetector: """ 并行检测器 - 同时执行盲道检测和障碍物检测 使用方式: detector = ParallelDetector(yolo_model, obstacle_detector) blind_mask, cross_mask, obstacles = detector.detect_all(image, path_mask) """ def __init__(self, yolo_model, obstacle_detector): self.yolo_model = yolo_model self.obstacle_detector = obstacle_detector # 检测参数(从环境变量读取) self.imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480")) self.use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1" self.blind_conf_threshold = 0.20 self.cross_conf_threshold = 0.30 # 初始化 _init_cuda_streams() _init_parallel_executor() logger.info(f"[GPU_PARALLEL] ParallelDetector 初始化完成: imgsz={self.imgsz}, half={self.use_half}") def detect_all( self, image: np.ndarray, path_mask: Optional[np.ndarray] = None ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]: """ 并行执行所有检测 Args: image: BGR 图像 path_mask: 盲道掩码(用于障碍物过滤) Returns: (blind_path_mask, crosswalk_mask, obstacles) """ t0 = time.perf_counter() streams = _init_cuda_streams() if streams and len(streams) >= 2: return self._detect_with_streams(image, path_mask, streams) else: # 回退到串行执行 return self._detect_serial(image, path_mask) def _detect_with_streams( self, image: np.ndarray, path_mask: Optional[np.ndarray], streams: List[torch.cuda.Stream] ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]: """Day 21: 使用 ThreadPoolExecutor 真正并行检测(替代无效的 CUDA Stream)""" blind_mask = None cross_mask = None obstacles = [] executor = _init_parallel_executor() # 定义两个检测任务 def task_blind_path(): if self.yolo_model is None: return None, None try: results = self.yolo_model.predict( image, verbose=False, conf=min(self.blind_conf_threshold, self.cross_conf_threshold), classes=[0, 1], # 0=crosswalk, 1=blind_path imgsz=self.imgsz, half=self.use_half ) if results and results[0] and results[0].masks is not None: return self._parse_seg_results(results[0], image.shape) except Exception as e: logger.error(f"[GPU_PARALLEL] 盲道检测失败: {e}") return None, None def task_obstacles(): if self.obstacle_detector is None: return [] try: return self.obstacle_detector.detect(image, path_mask=path_mask) except Exception as e: logger.error(f"[GPU_PARALLEL] 障碍物检测失败: {e}") return [] # 并行提交两个任务 from concurrent.futures import as_completed futures = { executor.submit(task_blind_path): 'blind', executor.submit(task_obstacles): 'obstacle' } # 等待所有任务完成 for future in as_completed(futures, timeout=2.0): task_type = futures[future] try: result = future.result() if task_type == 'blind': blind_mask, cross_mask = result else: obstacles = result except Exception as e: logger.error(f"[GPU_PARALLEL] {task_type}任务异常: {e}") return blind_mask, cross_mask, obstacles def _detect_serial( self, image: np.ndarray, path_mask: Optional[np.ndarray] ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]: """串行检测(回退模式)""" blind_mask = None cross_mask = None obstacles = [] # 盲道检测 if self.yolo_model is not None: try: results = self.yolo_model.predict( image, verbose=False, conf=min(self.blind_conf_threshold, self.cross_conf_threshold), classes=[0, 1], imgsz=self.imgsz, half=self.use_half ) if results and results[0] and results[0].masks is not None: blind_mask, cross_mask = self._parse_seg_results(results[0], image.shape) except Exception as e: logger.error(f"[GPU_PARALLEL] 盲道检测失败: {e}") # 障碍物检测 if self.obstacle_detector is not None: try: obstacles = self.obstacle_detector.detect(image, path_mask=path_mask) except Exception as e: logger.error(f"[GPU_PARALLEL] 障碍物检测失败: {e}") return blind_mask, cross_mask, obstacles def _parse_seg_results( self, result, image_shape: Tuple[int, int, int] ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """解析 YOLO 分割结果""" blind_mask = None cross_mask = None h, w = image_shape[:2] if result.masks is None or result.boxes is None: return None, None for mask_tensor, conf_tensor, cls_tensor in zip( result.masks.data, result.boxes.conf, result.boxes.cls ): class_id = int(cls_tensor.item()) confidence = float(conf_tensor.item()) # 置信度过滤 if class_id == 1 and confidence < self.blind_conf_threshold: continue if class_id == 0 and confidence < self.cross_conf_threshold: continue # 转换掩码 current_mask = self._tensor_to_mask(mask_tensor, w, h) if class_id == 1: # 盲道 if blind_mask is None: blind_mask = current_mask else: blind_mask = np.bitwise_or(blind_mask, current_mask) elif class_id == 0: # 斑马线 if cross_mask is None: cross_mask = current_mask else: cross_mask = np.bitwise_or(cross_mask, current_mask) return blind_mask, cross_mask def _tensor_to_mask( self, mask_tensor: torch.Tensor, out_w: int, out_h: int ) -> np.ndarray: """将 PyTorch 张量掩码转换为 NumPy 数组""" import cv2 # 转换为 numpy if mask_tensor.is_cuda: mask_np = mask_tensor.cpu().numpy() else: mask_np = mask_tensor.numpy() # 调整大小 if mask_np.shape[0] != out_h or mask_np.shape[1] != out_w: mask_np = cv2.resize(mask_np, (out_w, out_h), interpolation=cv2.INTER_NEAREST) # 二值化 mask_np = (mask_np > 0.5).astype(np.uint8) * 255 return mask_np def detect_all_parallel( yolo_model, obstacle_detector, image: np.ndarray, path_mask: Optional[np.ndarray] = None ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]: """ 便捷函数:并行执行所有检测 用于替换 workflow_blindpath.py 中的串行检测 """ detector = ParallelDetector(yolo_model, obstacle_detector) return detector.detect_all(image, path_mask)