# -*- coding: utf-8 -*- """ 室内导航工作流 (Indoor Navigation Workflow) Day 26: 专为室内导盲模型 (yolo11l-seg-indoor) 设计 类别映射 (14 classes from MIT Indoor): - 可行走区域: floor(0), corridor(1), sidewalk(2) - 静态障碍物: chair(3), table(4), sofa_bed(5), cabinet(11), trash_can(12) - 兴趣点: door(6), elevator(7), stairs(8) - 边界: wall(9), window(13) - 动态障碍: person(10) """ import os import time import logging import numpy as np import cv2 from dataclasses import dataclass from typing import Optional, List, Dict, Any from collections import deque logger = logging.getLogger(__name__) # ========== 类别常量 ========== # 可行走区域 WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk CLASS_FLOOR = 0 CLASS_CORRIDOR = 1 CLASS_SIDEWALK = 2 # 静态障碍物 (家具 + 杂物) OBSTACLE_CLASSES = {3, 4, 5, 11, 12, 14, 15, 16, 17, 18} CLASS_CHAIR = 3 CLASS_TABLE = 4 CLASS_SOFA_BED = 5 CLASS_CABINET = 11 CLASS_TRASH_CAN = 12 CLASS_TRASH_CAN = 12 # CLASS_CUP_BOTTLE = 14 (Removed) CLASS_BAG = 14 CLASS_ELECTRONICS = 15 CLASS_PLANT = 16 CLASS_OBSTACLE = 17 CLASS_APPLIANCE = 18 # 兴趣点 POI_CLASSES = {6, 7, 8, 19, 20} # door, elevator, stairs, toilet, sink CLASS_DOOR = 6 CLASS_ELEVATOR = 7 CLASS_STAIRS = 8 CLASS_TOILET = 19 CLASS_SINK = 20 # 边界 BOUNDARY_CLASSES = {9, 10} # wall, window CLASS_WALL = 9 CLASS_WINDOW = 10 # 动态障碍 CLASS_PERSON = 13 # 类别名称映射 CLASS_NAMES = { 0: 'floor', 1: 'corridor', 2: 'sidewalk', 3: 'chair', 4: 'table', 5: 'sofa_bed', 6: 'door', 7: 'elevator', 8: 'stairs', 9: 'wall', 10: 'window', 11: 'cabinet', 12: 'trash_can', 13: 'person', 14: 'bag', 15: 'electronics', 16: 'plant', 17: 'obstacle', 18: 'appliance', 19: 'toilet', 20: 'sink', 21: 'tableware' } # 中文名称(用于语音) CLASS_NAMES_CN = { 0: '地面', 1: '走廊', 2: '人行道', 3: '椅子', 4: '桌子', 5: '沙发', 6: '门', 7: '电梯', 8: '楼梯', 9: '墙壁', 10: '窗户', 11: '柜子', 12: '垃圾桶', 13: '行人', 14: '包', 15: '电子设备', 16: '绿植', 17: '障碍物', 18: '家电', 19: '卫生间', 20: '洗手台', 21: '餐具' } # 物品类 (不播报,除非寻找模式) ITEM_CLASSES = {21} CLASS_TABLEWARE = 21 # ========== 配置参数 ========== CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.25')) WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '3000')) OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '500')) # 语音间隔 GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0')) DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5')) POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0')) OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0')) # ========== 可视化颜色 (BGR) ========== VIS_COLORS = { 'walkable': (0, 255, 0), # 绿色 - 可行走 'obstacle': (0, 0, 255), # 红色 - 障碍物 'poi': (255, 255, 0), # 青色 - 兴趣点 'boundary': (128, 128, 128), # 灰色 - 边界 'person': (255, 0, 255), # 粉色 - 行人 'centerline': (255, 255, 0), # 黄色 - 引导线 } @dataclass class IndoorResult: """室内导航结果""" annotated_image: Optional[np.ndarray] = None guidance_text: str = "" state_info: Dict[str, Any] = None visualizations: List[Dict[str, Any]] = None def __post_init__(self): if self.state_info is None: self.state_info = {} if self.visualizations is None: self.visualizations = [] class IndoorNavigator: """室内导航器 - 专为室内导盲模型设计""" def __init__(self, seg_model=None, device_id: str = "indoor"): self.seg_model = seg_model self.device_id = device_id self.frame_counter = 0 # 语音节流 self.last_guide_time = 0 self.last_direction_time = 0 self.last_poi_time = 0 self.last_obstacle_time = 0 self.last_guidance_text = "" self.last_direction_text = "" # 检测间隔 self.detection_interval = int(os.getenv('INDOOR_DETECTION_INTERVAL', '6')) self.last_detection_frame = 0 # 缓存 self.last_walkable_mask = None self.last_obstacles = [] self.last_pois = [] # 灰度图(用于光流等) self.prev_gray = None # 日志间隔 self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30')) logger.info(f"[INDOOR] 室内导航器初始化完成") logger.info(f"[INDOOR] 检测间隔: 每{self.detection_interval}帧") logger.info(f"[INDOOR] 可行走类别: {[CLASS_NAMES[c] for c in WALKABLE_CLASSES]}") def reset(self): """重置状态""" self.frame_counter = 0 self.last_guide_time = 0 self.last_direction_time = 0 self.last_poi_time = 0 self.last_obstacle_time = 0 self.last_guidance_text = "" self.last_direction_text = "" self.last_walkable_mask = None self.last_obstacles = [] self.last_pois = [] self.prev_gray = None logger.info("[INDOOR] 导航器已重置") def process_frame(self, image: np.ndarray) -> IndoorResult: """处理单帧图像""" self.frame_counter += 1 h, w = image.shape[:2] now = time.time() frame_visualizations = [] guidance_text = "" state_info = {} # 是否执行检测 should_detect = (self.frame_counter - self.last_detection_frame) >= self.detection_interval if should_detect and self.seg_model is not None: self.last_detection_frame = self.frame_counter # 执行分割推理 walkable_mask, obstacles, pois = self._detect_all(image) # 更新缓存 self.last_walkable_mask = walkable_mask self.last_obstacles = obstacles self.last_pois = pois else: # 使用缓存 walkable_mask = self.last_walkable_mask obstacles = self.last_obstacles pois = self.last_pois # 生成导航引导 if walkable_mask is not None: guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now) # 添加可视化 self._add_mask_visualization(walkable_mask, frame_visualizations, "walkable_mask", "rgba(0, 255, 0, 0.3)") # 障碍物可视化 for obs in obstacles: self._add_detection_visualization(obs, frame_visualizations, "obstacle") # 兴趣点可视化 for poi in pois: self._add_detection_visualization(poi, frame_visualizations, "poi") # 日志 if self.frame_counter % self.log_interval == 0: walkable_area = int(walkable_mask.sum()) if walkable_mask is not None else 0 logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | " f"障碍物={len(obstacles)} | 兴趣点={len(pois)}") # 更新状态信息 state_info = { 'frame': self.frame_counter, 'walkable_detected': walkable_mask is not None and walkable_mask.sum() > 0, 'obstacles_count': len(obstacles), 'pois_count': len(pois), } # 更新灰度图 self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) return IndoorResult( annotated_image=image.copy(), guidance_text=guidance_text, state_info=state_info, visualizations=frame_visualizations ) def _detect_all(self, image: np.ndarray): """执行分割检测,返回可行走区域、障碍物、兴趣点""" h, w = image.shape[:2] walkable_mask = np.zeros((h, w), dtype=np.uint8) obstacles = [] pois = [] try: imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480")) use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1" results = self.seg_model.predict( image, imgsz=imgsz, conf=CONF_THRESHOLD, verbose=False, half=use_half ) if results and len(results) > 0 and results[0].masks is not None: r0 = results[0] masks = r0.masks.data.cpu().numpy() boxes = r0.boxes cls_id = int(cls_id.item()) conf_val = float(conf.item()) # 过滤物品类 (默认不参与导航逻辑,防止刷屏) if cls_id in ITEM_CLASSES: # 可以选择存入特定的 items 列表供"找东西"功能使用 # 这里暂时忽略,避免干扰避障 continue # 调整 mask 尺寸 mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST) mask_bin = (mask_resized > 0.5).astype(np.uint8) area = int(mask_bin.sum()) if area < 100: # 过滤小碎片 continue # 可行走区域 if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA: walkable_mask = cv2.bitwise_or(walkable_mask, mask_bin * 255) # 障碍物 elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON: if area > OBSTACLE_MIN_AREA: obstacles.append({ 'class_id': cls_id, 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), 'conf': conf_val, 'mask': mask_bin, 'area': area, 'center': self._mask_center(mask_bin), }) # 兴趣点 elif cls_id in POI_CLASSES: pois.append({ 'class_id': cls_id, 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), 'conf': conf_val, 'mask': mask_bin, 'area': area, 'center': self._mask_center(mask_bin), }) except Exception as e: logger.warning(f"[INDOOR] 检测失败: {e}") return walkable_mask, obstacles, pois def _mask_center(self, mask: np.ndarray): """计算 mask 质心""" M = cv2.moments(mask) if abs(M["m00"]) < 1e-6: return None cx = int(M["m10"] / M["m00"]) cy = int(M["m01"] / M["m00"]) return (cx, cy) def _generate_guidance(self, walkable_mask, obstacles, pois, h, w, now): """生成导航引导文本""" guidance_text = "" # 1. 计算可行走区域的偏移和方向 direction_guidance = self._compute_direction_guidance(walkable_mask, h, w) # 2. 检查障碍物警告 obstacle_warning = self._check_obstacle_warning(obstacles, walkable_mask, h, w) # 3. 检查兴趣点提示 poi_hint = self._check_poi_hint(pois, h, w) # 优先级:障碍物 > 方向 > 兴趣点 if obstacle_warning and (now - self.last_obstacle_time) > OBSTACLE_INTERVAL: guidance_text = obstacle_warning self.last_obstacle_time = now self.last_guidance_text = guidance_text elif direction_guidance: # 方向引导节流 if direction_guidance != self.last_direction_text: if (now - self.last_direction_time) > DIRECTION_INTERVAL: guidance_text = direction_guidance self.last_direction_time = now self.last_direction_text = direction_guidance elif (now - self.last_guide_time) > GUIDE_INTERVAL: # 同样的方向,降低频率 guidance_text = direction_guidance self.last_guide_time = now elif poi_hint and (now - self.last_poi_time) > POI_INTERVAL: guidance_text = poi_hint self.last_poi_time = now return guidance_text def _compute_direction_guidance(self, walkable_mask, h, w): """计算方向引导""" if walkable_mask is None or walkable_mask.sum() < WALKABLE_MIN_AREA: return "未检测到可行走区域" # 分析下半部分(更近的区域) lower_half = walkable_mask[int(h * 0.5):, :] if lower_half.sum() < 1000: return "前方可行走区域较小,请小心" # 计算左中右分布 third = w // 3 left_area = lower_half[:, :third].sum() center_area = lower_half[:, third:2*third].sum() right_area = lower_half[:, 2*third:].sum() total = left_area + center_area + right_area + 1e-6 left_ratio = left_area / total center_ratio = center_area / total right_ratio = right_area / total # 方向判断 if center_ratio > 0.4: return "保持直行" elif left_ratio > right_ratio * 1.5: return "向左调整" elif right_ratio > left_ratio * 1.5: return "向右调整" else: return "保持直行" def _check_obstacle_warning(self, obstacles, walkable_mask, h, w): """检查是否有障碍物在前方""" if not obstacles: return None # 定义前方区域(画面中下部) front_zone_top = int(h * 0.4) front_zone_left = int(w * 0.2) front_zone_right = int(w * 0.8) for obs in obstacles: center = obs.get('center') if center is None: continue cx, cy = center # 检查是否在前方区域 if front_zone_top < cy < h and front_zone_left < cx < front_zone_right: name_cn = obs.get('class_name_cn', '障碍物') # 判断位置 if cx < w * 0.4: return f"左前方有{name_cn}" elif cx > w * 0.6: return f"右前方有{name_cn}" else: return f"正前方有{name_cn}" return None def _check_poi_hint(self, pois, h, w): """检查兴趣点提示""" if not pois: return None for poi in pois: cls_id = poi.get('class_id') name_cn = poi.get('class_name_cn', '兴趣点') center = poi.get('center') if center is None: continue cx, cy = center # 楼梯需要特别警告 if cls_id == CLASS_STAIRS: if cy > h * 0.5: # 比较近 return f"注意前方有{name_cn}" # 门/电梯/卫生间/洗手台提示 elif cls_id in (CLASS_DOOR, CLASS_ELEVATOR, CLASS_TOILET, CLASS_SINK): if cy > h * 0.3: # 在视野内 position = "左侧" if cx < w * 0.4 else ("右侧" if cx > w * 0.6 else "前方") return f"{position}有{name_cn}" return None def _add_mask_visualization(self, mask, visualizations, viz_type, color): """添加 mask 可视化""" if mask is None or mask.sum() == 0: return visualizations.append({ 'type': viz_type, 'mask': mask, 'color': color }) def _add_detection_visualization(self, detection, visualizations, det_type): """添加检测框可视化""" center = detection.get('center') if center is None: return visualizations.append({ 'type': det_type, 'center': center, 'class_name': detection.get('class_name', 'unknown'), 'class_name_cn': detection.get('class_name_cn', '未知'), 'conf': detection.get('conf', 0), })