Files
NaviGlassServer/obstacle_detector_client.py
2025-12-31 15:42:30 +08:00

244 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/cloud/obstacle_detector_client.py (新文件)
import logging
import os
import cv2
import numpy as np
import torch
from threading import Semaphore
from contextlib import contextmanager
from ultralytics import YOLOE
from typing import List, Dict, Any
# Day 20: Numba 多核加速
try:
from numba_utils import count_mask_pixels, compute_mask_stats, bitwise_and_count, warmup as numba_warmup
NUMBA_ENABLED = True
except ImportError:
NUMBA_ENABLED = False
logger = logging.getLogger(__name__)
# --- GPU/CPU & AMP 配置 (从 blindpath 工作流迁移而来,保持一致) ---
DEVICE = os.getenv("AIGLASS_DEVICE", "cuda:0")
if DEVICE.startswith("cuda") and not torch.cuda.is_available():
logger.warning(f"AIGLASS_DEVICE={DEVICE} 但未检测到 CUDA将回退到 CPU")
DEVICE = "cpu"
IS_CUDA = DEVICE.startswith("cuda")
AMP_POLICY = os.getenv("AIGLASS_AMP", "fp16").lower()
if AMP_POLICY not in ("bf16", "fp16", "off"):
AMP_POLICY = "fp16"
AMP_DTYPE = torch.bfloat16 if AMP_POLICY == "bf16" else (torch.float16 if AMP_POLICY == "fp16" else None)
# --- GPU 并发限流 (从 blindpath 工作流迁移而来,保持一致) ---
# Day 20: 增加默认槽位从 2 到 4RTX 3090 可以处理更多并发
GPU_SLOTS = int(os.getenv("AIGLASS_GPU_SLOTS", "4"))
_gpu_slots = Semaphore(GPU_SLOTS)
try:
torch.backends.cudnn.benchmark = True
except Exception:
pass
@contextmanager
def gpu_infer_slot():
"""统一管理 GPU 并发限流 + inference_mode + AMP autocast"""
with _gpu_slots:
if IS_CUDA and AMP_POLICY != "off":
# 新式接口torch.amp.autocast(device_type='cuda', dtype=...)
with torch.inference_mode(), torch.amp.autocast(device_type='cuda', dtype=AMP_DTYPE):
yield
else:
with torch.inference_mode():
yield
class ObstacleDetectorClient:
def __init__(self, model_path: str = 'model/yoloe-11l-seg.pt'):
self.model = None
self.whitelist_embeddings = None
self.WHITELIST_CLASSES = [
'bicycle', 'car', 'motorcycle', 'bus', 'truck', 'animal', 'scooter', 'stroller', 'dog',
'pole', 'post', 'column', 'pillar', 'stanchion', 'bollard', 'utility pole',
'telegraph pole', 'light pole', 'street pole', 'signpost', 'support post',
'vertical post', 'bench', 'chair', 'potted plant', 'hydrant', 'cone', 'stone', 'box'
]
# COCO 类别白名单 - TensorRT 模式下用于后处理过滤
# 从 COCO 80 类中筛选出可能构成障碍物的类别
self.COCO_WHITELIST = {
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck', # 交通
'dog', 'cat', 'horse', 'cow', 'sheep', # 动物
'bench', 'chair', 'potted plant', 'fire hydrant', 'stop sign', # 街道设施
'parking meter', 'suitcase', 'backpack', 'umbrella', 'handbag', # 物品
'sports ball', 'skateboard', 'surfboard', 'bottle', 'cup', # 可能障碍
}
try:
# Day 20: 优先使用 TensorRT 引擎
try:
from model_utils import get_best_model_path, is_tensorrt_engine
model_path = get_best_model_path(model_path)
except ImportError:
def is_tensorrt_engine(p): return p.endswith('.engine')
logger.info(f"正在加载 YOLOE 障碍物模型: {model_path}")
self.model = YOLOE(model_path)
# Day 20: TensorRT 引擎不需要 .to() 和 .fuse()
if is_tensorrt_engine(model_path):
logger.info(f"TensorRT 引擎已加载,跳过 .to() 和 .fuse()")
# TensorRT 引擎不支持 get_text_pe跳过白名单特征计算
self.whitelist_embeddings = None
logger.info("TensorRT 模式:跳过白名单特征预计算")
else:
self.model.to(DEVICE)
self.model.fuse()
logger.info(f"YOLOE 障碍物模型加载成功,使用设备: {DEVICE}")
logger.info("正在为 YOLOE 预计算白名单文本特征...")
if IS_CUDA and AMP_DTYPE is not None:
with torch.inference_mode(), torch.amp.autocast(device_type='cuda', dtype=AMP_DTYPE):
self.whitelist_embeddings = self.model.get_text_pe(self.WHITELIST_CLASSES)
else:
self.whitelist_embeddings = self.model.get_text_pe(self.WHITELIST_CLASSES)
logger.info("YOLOE 特征预计算完成。")
except Exception as e:
logger.error(f"YOLOE 模型加载或特征计算失败: {e}", exc_info=True)
raise
def tensor_to_numpy_mask(mask_tensor):
"""安全地将各种类型的张量转换为 numpy 掩码"""
# 处理不同的数据类型
if mask_tensor.dtype in (torch.bfloat16, torch.float16):
mask_tensor = mask_tensor.float()
# 转换为 numpy
mask = mask_tensor.cpu().numpy()
# 确保是二值掩码
if mask.max() <= 1.0:
mask = (mask > 0.5).astype(np.uint8) * 255
else:
mask = mask.astype(np.uint8)
return mask
def detect(self, image: np.ndarray, path_mask: np.ndarray = None) -> List[Dict[str, Any]]:
"""
利用白名单作为提示词寻找障碍物。
如果提供了 path_mask则执行与路径相关的空间过滤。
如果 path_mask 为 None则进行全局检测。
"""
if self.model is None:
return []
H, W = image.shape[:2]
# TensorRT 模式下没有 embeddings跳过 set_classes
# 此时模型会使用默认的 COCO 类别进行检测
if self.whitelist_embeddings is not None:
try:
self.model.set_classes(self.WHITELIST_CLASSES, self.whitelist_embeddings)
except Exception as e:
logger.error(f"设置 YOLOE 提示词失败: {e}")
return []
conf_thr = float(os.getenv("AIGLASS_OBS_CONF", "0.25"))
# Day 22 优化: 动态输入尺寸和FP16加速
imgsz = int(os.getenv("AIGLASS_OBS_IMGSZ", "480")) # 从默认640降低
use_half = os.getenv("AIGLASS_OBS_HALF", "1") == "1"
with gpu_infer_slot():
results = self.model.predict(
image,
verbose=False,
conf=conf_thr,
imgsz=imgsz, # 使用较小的输入尺寸
half=use_half # FP16 半精度加速
)
if not (results and results[0].masks):
return []
# --- 过滤与后处理 (逻辑与 blindpath 工作流保持一致) ---
final_obstacles = []
num_masks = len(results[0].masks.data)
num_boxes = len(results[0].boxes.cls) if getattr(results[0].boxes, "cls", None) is not None else 0
for i, mask_tensor in enumerate(results[0].masks.data):
if i >= num_boxes: continue
# 【修复】处理 BFloat16 类型的掩码
# 先转换为 float32避免 numpy 不支持 BFloat16 的问题
if mask_tensor.dtype == torch.bfloat16:
mask_tensor = mask_tensor.float()
# 转换为 numpy 数组
mask = mask_tensor.cpu().numpy()
# 处理概率掩码值在0-1之间或二值掩码
if mask.max() <= 1.0:
# 概率掩码,需要二值化
mask = (mask > 0.5).astype(np.uint8) * 255
else:
# 已经是二值掩码
mask = mask.astype(np.uint8)
mask = cv2.resize(mask, (W, H), interpolation=cv2.INTER_NEAREST)
# Day 20: 使用 Numba 多核加速计算 mask 统计信息
if NUMBA_ENABLED:
stats = compute_mask_stats(mask)
area = stats['area']
center_x = stats['center_x']
center_y = stats['center_y']
min_y, max_y = stats['bbox'][1], stats['bbox'][3]
else:
area = int(np.sum(mask > 0))
y_coords, x_coords = np.where(mask > 0)
if len(y_coords) == 0:
continue
center_x = float(np.mean(x_coords))
center_y = float(np.mean(y_coords))
min_y, max_y = int(np.min(y_coords)), int(np.max(y_coords))
# 尺寸过滤:太大的物体(如整片地面)通常是误识别
if (area / (H * W)) > 0.7: continue
if area == 0: continue
# 空间过滤:如果提供了 path_mask则只保留路径上的障碍物
if path_mask is not None:
# Day 20: 使用 Numba 加速交集计算
if NUMBA_ENABLED:
intersection_area = bitwise_and_count(mask, path_mask)
else:
intersection_area = int(np.sum(cv2.bitwise_and(mask, path_mask) > 0))
# 必须与路径有足够的重叠
if intersection_area < 100 or (intersection_area / area) < 0.01:
continue
cls_id = int(results[0].boxes.cls[i])
class_names_map = results[0].names
class_name = "Unknown"
if isinstance(class_names_map, dict):
# 如果是字典,使用 .get() 方法
class_name = class_names_map.get(cls_id, "Unknown")
elif isinstance(class_names_map, list) and 0 <= cls_id < len(class_names_map):
# 如果是列表,通过索引安全地获取
class_name = class_names_map[cls_id]
# TensorRT 模式下使用 COCO 白名单过滤
# 只保留可能构成障碍物的类别
if self.whitelist_embeddings is None: # TensorRT 模式
if class_name.lower().strip() not in self.COCO_WHITELIST:
continue # 跳过非白名单类别
final_obstacles.append({
'name': class_name.strip(),
'mask': mask,
'area': area,
'area_ratio': area / (H * W),
'center_x': center_x,
'center_y': center_y,
'bottom_y_ratio': max_y / H
})
return final_obstacles