272 lines
9.4 KiB
Python
272 lines
9.4 KiB
Python
# gpu_parallel.py
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Day 20: GPU 并行推理优化模块
|
||
使用 CUDA Stream 让盲道检测和障碍物检测并行执行
|
||
"""
|
||
import os
|
||
import time
|
||
import torch
|
||
import numpy as np
|
||
from typing import Tuple, Optional, List, Any
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 全局 CUDA Stream(延迟初始化)
|
||
_cuda_streams = None
|
||
_parallel_executor = None
|
||
|
||
def _init_cuda_streams():
|
||
"""初始化 CUDA Streams"""
|
||
global _cuda_streams
|
||
if _cuda_streams is None and torch.cuda.is_available():
|
||
try:
|
||
_cuda_streams = [torch.cuda.Stream() for _ in range(2)]
|
||
logger.info("[GPU_PARALLEL] 已创建 2 个 CUDA Stream")
|
||
except Exception as e:
|
||
logger.warning(f"[GPU_PARALLEL] 创建 CUDA Stream 失败: {e}")
|
||
_cuda_streams = []
|
||
return _cuda_streams
|
||
|
||
def _init_parallel_executor():
|
||
"""初始化并行执行器(用于 CPU 后处理)"""
|
||
global _parallel_executor
|
||
if _parallel_executor is None:
|
||
_parallel_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="gpu_post")
|
||
logger.info("[GPU_PARALLEL] 已创建 GPU 后处理线程池")
|
||
return _parallel_executor
|
||
|
||
class ParallelDetector:
|
||
"""
|
||
并行检测器 - 同时执行盲道检测和障碍物检测
|
||
|
||
使用方式:
|
||
detector = ParallelDetector(yolo_model, obstacle_detector)
|
||
blind_mask, cross_mask, obstacles = detector.detect_all(image, path_mask)
|
||
"""
|
||
|
||
def __init__(self, yolo_model, obstacle_detector):
|
||
self.yolo_model = yolo_model
|
||
self.obstacle_detector = obstacle_detector
|
||
|
||
# 检测参数(从环境变量读取)
|
||
self.imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480"))
|
||
self.use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1"
|
||
self.blind_conf_threshold = 0.20
|
||
self.cross_conf_threshold = 0.30
|
||
|
||
# 初始化
|
||
_init_cuda_streams()
|
||
_init_parallel_executor()
|
||
|
||
logger.info(f"[GPU_PARALLEL] ParallelDetector 初始化完成: imgsz={self.imgsz}, half={self.use_half}")
|
||
|
||
def detect_all(
|
||
self,
|
||
image: np.ndarray,
|
||
path_mask: Optional[np.ndarray] = None
|
||
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]:
|
||
"""
|
||
并行执行所有检测
|
||
|
||
Args:
|
||
image: BGR 图像
|
||
path_mask: 盲道掩码(用于障碍物过滤)
|
||
|
||
Returns:
|
||
(blind_path_mask, crosswalk_mask, obstacles)
|
||
"""
|
||
t0 = time.perf_counter()
|
||
|
||
streams = _init_cuda_streams()
|
||
|
||
if streams and len(streams) >= 2:
|
||
return self._detect_with_streams(image, path_mask, streams)
|
||
else:
|
||
# 回退到串行执行
|
||
return self._detect_serial(image, path_mask)
|
||
|
||
def _detect_with_streams(
|
||
self,
|
||
image: np.ndarray,
|
||
path_mask: Optional[np.ndarray],
|
||
streams: List[torch.cuda.Stream]
|
||
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]:
|
||
"""Day 21: 使用 ThreadPoolExecutor 真正并行检测(替代无效的 CUDA Stream)"""
|
||
|
||
blind_mask = None
|
||
cross_mask = None
|
||
obstacles = []
|
||
|
||
executor = _init_parallel_executor()
|
||
|
||
# 定义两个检测任务
|
||
def task_blind_path():
|
||
if self.yolo_model is None:
|
||
return None, None
|
||
try:
|
||
results = self.yolo_model.predict(
|
||
image,
|
||
verbose=False,
|
||
conf=min(self.blind_conf_threshold, self.cross_conf_threshold),
|
||
classes=[0, 1], # 0=crosswalk, 1=blind_path
|
||
imgsz=self.imgsz,
|
||
half=self.use_half
|
||
)
|
||
if results and results[0] and results[0].masks is not None:
|
||
return self._parse_seg_results(results[0], image.shape)
|
||
except Exception as e:
|
||
logger.error(f"[GPU_PARALLEL] 盲道检测失败: {e}")
|
||
return None, None
|
||
|
||
def task_obstacles():
|
||
if self.obstacle_detector is None:
|
||
return []
|
||
try:
|
||
return self.obstacle_detector.detect(image, path_mask=path_mask)
|
||
except Exception as e:
|
||
logger.error(f"[GPU_PARALLEL] 障碍物检测失败: {e}")
|
||
return []
|
||
|
||
# 并行提交两个任务
|
||
from concurrent.futures import as_completed
|
||
futures = {
|
||
executor.submit(task_blind_path): 'blind',
|
||
executor.submit(task_obstacles): 'obstacle'
|
||
}
|
||
|
||
# 等待所有任务完成
|
||
for future in as_completed(futures, timeout=2.0):
|
||
task_type = futures[future]
|
||
try:
|
||
result = future.result()
|
||
if task_type == 'blind':
|
||
blind_mask, cross_mask = result
|
||
else:
|
||
obstacles = result
|
||
except Exception as e:
|
||
logger.error(f"[GPU_PARALLEL] {task_type}任务异常: {e}")
|
||
|
||
return blind_mask, cross_mask, obstacles
|
||
|
||
def _detect_serial(
|
||
self,
|
||
image: np.ndarray,
|
||
path_mask: Optional[np.ndarray]
|
||
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]:
|
||
"""串行检测(回退模式)"""
|
||
|
||
blind_mask = None
|
||
cross_mask = None
|
||
obstacles = []
|
||
|
||
# 盲道检测
|
||
if self.yolo_model is not None:
|
||
try:
|
||
results = self.yolo_model.predict(
|
||
image,
|
||
verbose=False,
|
||
conf=min(self.blind_conf_threshold, self.cross_conf_threshold),
|
||
classes=[0, 1],
|
||
imgsz=self.imgsz,
|
||
half=self.use_half
|
||
)
|
||
|
||
if results and results[0] and results[0].masks is not None:
|
||
blind_mask, cross_mask = self._parse_seg_results(results[0], image.shape)
|
||
except Exception as e:
|
||
logger.error(f"[GPU_PARALLEL] 盲道检测失败: {e}")
|
||
|
||
# 障碍物检测
|
||
if self.obstacle_detector is not None:
|
||
try:
|
||
obstacles = self.obstacle_detector.detect(image, path_mask=path_mask)
|
||
except Exception as e:
|
||
logger.error(f"[GPU_PARALLEL] 障碍物检测失败: {e}")
|
||
|
||
return blind_mask, cross_mask, obstacles
|
||
|
||
def _parse_seg_results(
|
||
self,
|
||
result,
|
||
image_shape: Tuple[int, int, int]
|
||
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
|
||
"""解析 YOLO 分割结果"""
|
||
|
||
blind_mask = None
|
||
cross_mask = None
|
||
|
||
h, w = image_shape[:2]
|
||
|
||
if result.masks is None or result.boxes is None:
|
||
return None, None
|
||
|
||
for mask_tensor, conf_tensor, cls_tensor in zip(
|
||
result.masks.data, result.boxes.conf, result.boxes.cls
|
||
):
|
||
class_id = int(cls_tensor.item())
|
||
confidence = float(conf_tensor.item())
|
||
|
||
# 置信度过滤
|
||
if class_id == 1 and confidence < self.blind_conf_threshold:
|
||
continue
|
||
if class_id == 0 and confidence < self.cross_conf_threshold:
|
||
continue
|
||
|
||
# 转换掩码
|
||
current_mask = self._tensor_to_mask(mask_tensor, w, h)
|
||
|
||
if class_id == 1: # 盲道
|
||
if blind_mask is None:
|
||
blind_mask = current_mask
|
||
else:
|
||
blind_mask = np.bitwise_or(blind_mask, current_mask)
|
||
elif class_id == 0: # 斑马线
|
||
if cross_mask is None:
|
||
cross_mask = current_mask
|
||
else:
|
||
cross_mask = np.bitwise_or(cross_mask, current_mask)
|
||
|
||
return blind_mask, cross_mask
|
||
|
||
def _tensor_to_mask(
|
||
self,
|
||
mask_tensor: torch.Tensor,
|
||
out_w: int,
|
||
out_h: int
|
||
) -> np.ndarray:
|
||
"""将 PyTorch 张量掩码转换为 NumPy 数组"""
|
||
import cv2
|
||
|
||
# 转换为 numpy
|
||
if mask_tensor.is_cuda:
|
||
mask_np = mask_tensor.cpu().numpy()
|
||
else:
|
||
mask_np = mask_tensor.numpy()
|
||
|
||
# 调整大小
|
||
if mask_np.shape[0] != out_h or mask_np.shape[1] != out_w:
|
||
mask_np = cv2.resize(mask_np, (out_w, out_h), interpolation=cv2.INTER_NEAREST)
|
||
|
||
# 二值化
|
||
mask_np = (mask_np > 0.5).astype(np.uint8) * 255
|
||
|
||
return mask_np
|
||
|
||
|
||
def detect_all_parallel(
|
||
yolo_model,
|
||
obstacle_detector,
|
||
image: np.ndarray,
|
||
path_mask: Optional[np.ndarray] = None
|
||
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[Any]]:
|
||
"""
|
||
便捷函数:并行执行所有检测
|
||
|
||
用于替换 workflow_blindpath.py 中的串行检测
|
||
"""
|
||
detector = ParallelDetector(yolo_model, obstacle_detector)
|
||
return detector.detect_all(image, path_mask)
|