# app/cloud/obstacle_detector_client.py (新文件) import logging import os import cv2 import numpy as np import torch from threading import Semaphore from contextlib import contextmanager from ultralytics import YOLOE from typing import List, Dict, Any # Day 20: Numba 多核加速 try: from numba_utils import count_mask_pixels, compute_mask_stats, bitwise_and_count, warmup as numba_warmup NUMBA_ENABLED = True except ImportError: NUMBA_ENABLED = False logger = logging.getLogger(__name__) # --- GPU/CPU & AMP 配置 (从 blindpath 工作流迁移而来,保持一致) --- DEVICE = os.getenv("AIGLASS_DEVICE", "cuda:0") if DEVICE.startswith("cuda") and not torch.cuda.is_available(): logger.warning(f"AIGLASS_DEVICE={DEVICE} 但未检测到 CUDA,将回退到 CPU") DEVICE = "cpu" IS_CUDA = DEVICE.startswith("cuda") AMP_POLICY = os.getenv("AIGLASS_AMP", "fp16").lower() if AMP_POLICY not in ("bf16", "fp16", "off"): AMP_POLICY = "fp16" AMP_DTYPE = torch.bfloat16 if AMP_POLICY == "bf16" else (torch.float16 if AMP_POLICY == "fp16" else None) # --- GPU 并发限流 (从 blindpath 工作流迁移而来,保持一致) --- # Day 20: 增加默认槽位从 2 到 4,RTX 3090 可以处理更多并发 GPU_SLOTS = int(os.getenv("AIGLASS_GPU_SLOTS", "4")) _gpu_slots = Semaphore(GPU_SLOTS) try: torch.backends.cudnn.benchmark = True except Exception: pass @contextmanager def gpu_infer_slot(): """统一管理 GPU 并发限流 + inference_mode + AMP autocast""" with _gpu_slots: if IS_CUDA and AMP_POLICY != "off": # 新式接口:torch.amp.autocast(device_type='cuda', dtype=...) with torch.inference_mode(), torch.amp.autocast(device_type='cuda', dtype=AMP_DTYPE): yield else: with torch.inference_mode(): yield class ObstacleDetectorClient: def __init__(self, model_path: str = 'model/yoloe-11l-seg.pt'): self.model = None self.whitelist_embeddings = None self.WHITELIST_CLASSES = [ 'bicycle', 'car', 'motorcycle', 'bus', 'truck', 'animal', 'scooter', 'stroller', 'dog', 'pole', 'post', 'column', 'pillar', 'stanchion', 'bollard', 'utility pole', 'telegraph pole', 'light pole', 'street pole', 'signpost', 'support post', 'vertical post', 'bench', 'chair', 'potted plant', 'hydrant', 'cone', 'stone', 'box' ] # COCO 类别白名单 - TensorRT 模式下用于后处理过滤 # 从 COCO 80 类中筛选出可能构成障碍物的类别 self.COCO_WHITELIST = { 'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck', # 交通 'dog', 'cat', 'horse', 'cow', 'sheep', # 动物 'bench', 'chair', 'potted plant', 'fire hydrant', 'stop sign', # 街道设施 'parking meter', 'suitcase', 'backpack', 'umbrella', 'handbag', # 物品 'sports ball', 'skateboard', 'surfboard', 'bottle', 'cup', # 可能障碍 } try: # Day 20: 优先使用 TensorRT 引擎 try: from model_utils import get_best_model_path, is_tensorrt_engine model_path = get_best_model_path(model_path) except ImportError: def is_tensorrt_engine(p): return p.endswith('.engine') logger.info(f"正在加载 YOLOE 障碍物模型: {model_path}") self.model = YOLOE(model_path) # Day 20: TensorRT 引擎不需要 .to() 和 .fuse() if is_tensorrt_engine(model_path): logger.info(f"TensorRT 引擎已加载,跳过 .to() 和 .fuse()") # TensorRT 引擎不支持 get_text_pe,跳过白名单特征计算 self.whitelist_embeddings = None logger.info("TensorRT 模式:跳过白名单特征预计算") else: self.model.to(DEVICE) self.model.fuse() logger.info(f"YOLOE 障碍物模型加载成功,使用设备: {DEVICE}") logger.info("正在为 YOLOE 预计算白名单文本特征...") if IS_CUDA and AMP_DTYPE is not None: with torch.inference_mode(), torch.amp.autocast(device_type='cuda', dtype=AMP_DTYPE): self.whitelist_embeddings = self.model.get_text_pe(self.WHITELIST_CLASSES) else: self.whitelist_embeddings = self.model.get_text_pe(self.WHITELIST_CLASSES) logger.info("YOLOE 特征预计算完成。") except Exception as e: logger.error(f"YOLOE 模型加载或特征计算失败: {e}", exc_info=True) raise def tensor_to_numpy_mask(mask_tensor): """安全地将各种类型的张量转换为 numpy 掩码""" # 处理不同的数据类型 if mask_tensor.dtype in (torch.bfloat16, torch.float16): mask_tensor = mask_tensor.float() # 转换为 numpy mask = mask_tensor.cpu().numpy() # 确保是二值掩码 if mask.max() <= 1.0: mask = (mask > 0.5).astype(np.uint8) * 255 else: mask = mask.astype(np.uint8) return mask def detect(self, image: np.ndarray, path_mask: np.ndarray = None) -> List[Dict[str, Any]]: """ 利用白名单作为提示词寻找障碍物。 如果提供了 path_mask,则执行与路径相关的空间过滤。 如果 path_mask 为 None,则进行全局检测。 """ if self.model is None: return [] H, W = image.shape[:2] # TensorRT 模式下没有 embeddings,跳过 set_classes # 此时模型会使用默认的 COCO 类别进行检测 if self.whitelist_embeddings is not None: try: self.model.set_classes(self.WHITELIST_CLASSES, self.whitelist_embeddings) except Exception as e: logger.error(f"设置 YOLOE 提示词失败: {e}") return [] conf_thr = float(os.getenv("AIGLASS_OBS_CONF", "0.25")) # Day 22 优化: 动态输入尺寸和FP16加速 imgsz = int(os.getenv("AIGLASS_OBS_IMGSZ", "480")) # 从默认640降低 use_half = os.getenv("AIGLASS_OBS_HALF", "1") == "1" with gpu_infer_slot(): results = self.model.predict( image, verbose=False, conf=conf_thr, imgsz=imgsz, # 使用较小的输入尺寸 half=use_half # FP16 半精度加速 ) if not (results and results[0].masks): return [] # --- 过滤与后处理 (逻辑与 blindpath 工作流保持一致) --- final_obstacles = [] num_masks = len(results[0].masks.data) num_boxes = len(results[0].boxes.cls) if getattr(results[0].boxes, "cls", None) is not None else 0 for i, mask_tensor in enumerate(results[0].masks.data): if i >= num_boxes: continue # 【修复】处理 BFloat16 类型的掩码 # 先转换为 float32,避免 numpy 不支持 BFloat16 的问题 if mask_tensor.dtype == torch.bfloat16: mask_tensor = mask_tensor.float() # 转换为 numpy 数组 mask = mask_tensor.cpu().numpy() # 处理概率掩码(值在0-1之间)或二值掩码 if mask.max() <= 1.0: # 概率掩码,需要二值化 mask = (mask > 0.5).astype(np.uint8) * 255 else: # 已经是二值掩码 mask = mask.astype(np.uint8) mask = cv2.resize(mask, (W, H), interpolation=cv2.INTER_NEAREST) # Day 20: 使用 Numba 多核加速计算 mask 统计信息 if NUMBA_ENABLED: stats = compute_mask_stats(mask) area = stats['area'] center_x = stats['center_x'] center_y = stats['center_y'] min_y, max_y = stats['bbox'][1], stats['bbox'][3] else: area = int(np.sum(mask > 0)) y_coords, x_coords = np.where(mask > 0) if len(y_coords) == 0: continue center_x = float(np.mean(x_coords)) center_y = float(np.mean(y_coords)) min_y, max_y = int(np.min(y_coords)), int(np.max(y_coords)) # 尺寸过滤:太大的物体(如整片地面)通常是误识别 if (area / (H * W)) > 0.7: continue if area == 0: continue # 空间过滤:如果提供了 path_mask,则只保留路径上的障碍物 if path_mask is not None: # Day 20: 使用 Numba 加速交集计算 if NUMBA_ENABLED: intersection_area = bitwise_and_count(mask, path_mask) else: intersection_area = int(np.sum(cv2.bitwise_and(mask, path_mask) > 0)) # 必须与路径有足够的重叠 if intersection_area < 100 or (intersection_area / area) < 0.01: continue cls_id = int(results[0].boxes.cls[i]) class_names_map = results[0].names class_name = "Unknown" if isinstance(class_names_map, dict): # 如果是字典,使用 .get() 方法 class_name = class_names_map.get(cls_id, "Unknown") elif isinstance(class_names_map, list) and 0 <= cls_id < len(class_names_map): # 如果是列表,通过索引安全地获取 class_name = class_names_map[cls_id] # TensorRT 模式下使用 COCO 白名单过滤 # 只保留可能构成障碍物的类别 if self.whitelist_embeddings is None: # TensorRT 模式 if class_name.lower().strip() not in self.COCO_WHITELIST: continue # 跳过非白名单类别 final_obstacles.append({ 'name': class_name.strip(), 'mask': mask, 'area': area, 'area_ratio': area / (H * W), 'center_x': center_x, 'center_y': center_y, 'bottom_y_ratio': max_y / H }) return final_obstacles