# -*- coding: utf-8 -*- """ 室内导航工作流 (Indoor Navigation Workflow) Day 26: 专为室内导盲模型 (yolo11l-seg-indoor14) 设计 类别映射 (14 classes from MIT Indoor): - 可行走区域: floor(0), corridor(1), sidewalk(2) - 静态障碍物: chair(3), table(4), sofa_bed(5), cabinet(11), trash_can(12) - 兴趣点: door(6), elevator(7), stairs(8) - 边界: wall(9), window(13) - 动态障碍: person(10) """ import os import time import logging import numpy as np import cv2 from dataclasses import dataclass from typing import Optional, List, Dict, Any from collections import deque logger = logging.getLogger(__name__) # ========== 类别常量 (14类模型 - yolo11l-seg-indoor14) ========== # Day 28: 使用 14 类模型 (MIT Indoor Subset) # 可行走区域 (0-2) WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk CLASS_FLOOR = 0 CLASS_CORRIDOR = 1 CLASS_SIDEWALK = 2 # 静态障碍物 (3-5, 11-12) OBSTACLE_CLASSES = {3, 4, 5, 11, 12, 13} # window 只要是障碍物也算? window(13)是墙? # Wait, Window is 13. Is window an obstacle? Usually yes (don't walk into it). # Cabinet 11, Trash 12. CLASS_CHAIR = 3 CLASS_TABLE = 4 CLASS_SOFA_BED = 5 CLASS_CABINET = 11 CLASS_TRASH_CAN = 12 CLASS_WINDOW = 13 # 窗户通常视为边界或障碍 CLASS_WALL = 9 # Wall 9 # 兴趣点 (6-8) POI_CLASSES = {6, 7, 8} # door, elevator, stairs CLASS_DOOR = 6 CLASS_ELEVATOR = 7 CLASS_STAIRS = 8 # 动态障碍 (10) CLASS_PERSON = 10 # 边界 BOUNDARY_CLASSES = {9, 13} # wall(9), window(13) # 类别名称映射 CLASS_NAMES = { 0: 'floor', 1: 'corridor', 2: 'sidewalk', 3: 'chair', 4: 'table', 5: 'sofa_bed', 6: 'door', 7: 'elevator', 8: 'stairs', 9: 'wall', 10: 'person', 11: 'cabinet', 12: 'trash_can', 13: 'window' } # 中文名称(用于语音) CLASS_NAMES_CN = { 0: '地面', 1: '走廊', 2: '人行道', 3: '椅子', 4: '桌子', 5: '沙发', 6: '门', 7: '电梯', 8: '楼梯', 9: '墙壁', 10: '行人', 11: '柜子', 12: '垃圾桶', 13: '窗户' } # 物品类 (无) ITEM_CLASSES = set() # ========== 配置参数 ========== # Day 28: 进一步降低阈值以提升木地板检测率 # Day 28: 进一步降低阈值以提升木地板检测率 CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.05')) # 全局极低阈值,由后续逻辑二次过滤 WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '50')) # 极端降低最小面积以进行调试 (原 1000) OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '300')) # 语音间隔 GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0')) DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5')) POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0')) OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0')) # Day 28: “未检测到可行走区域”播报间隔(8秒) NO_WALKABLE_INTERVAL = float(os.getenv('INDOOR_NO_WALKABLE_INTERVAL', '8.0')) # ========== 可视化颜色 (BGR) ========== VIS_COLORS = { 'walkable': (0, 255, 0), # 绿色 - 可行走 'obstacle': (0, 0, 255), # 红色 - 障碍物 'poi': (255, 255, 0), # 青色 - 兴趣点 'boundary': (128, 128, 128), # 灰色 - 边界 'person': (255, 0, 255), # 粉色 - 行人 'centerline': (255, 255, 0), # 黄色 - 引导线 } @dataclass class IndoorResult: """室内导航结果""" annotated_image: Optional[np.ndarray] = None guidance_text: str = "" state_info: Dict[str, Any] = None visualizations: List[Dict[str, Any]] = None def __post_init__(self): if self.state_info is None: self.state_info = {} if self.visualizations is None: self.visualizations = [] class IndoorNavigator: """室内导航器 - 专为室内导盲模型设计""" def __init__(self, seg_model=None, device_id: str = "indoor"): self.seg_model = seg_model self.device_id = device_id self.frame_counter = 0 # Day 28: 持久化缓冲参数 self.no_walkable_persistence_sec = 2.0 self.last_walkable_detected_time = 0 # 语音节流 self.last_guide_time = 0 self.last_direction_time = 0 self.last_poi_time = 0 self.last_obstacle_time = 0 self.last_guidance_text = "" self.last_direction_text = "" # 检测间隔 self.detection_interval = int(os.getenv('INDOOR_DETECTION_INTERVAL', '6')) self.last_detection_frame = 0 # 缓存 self.last_walkable_mask = None self.last_valid_walkable_mask = None self.last_no_walkable_time = 0 self.last_obstacles = [] self.last_obstacles = [] self.last_pois = [] # Day 28: 移除未使用的灰度图转换 (光流功能未启用) # self.prev_gray = None # 日志间隔 self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30')) logger.info(f"[INDOOR] 室内导航器初始化完成") logger.info(f"[INDOOR] 检测间隔: 每{self.detection_interval}帧") logger.info(f"[INDOOR] 可行走类别: {[CLASS_NAMES[c] for c in WALKABLE_CLASSES]}") def reset(self): """重置状态""" self.frame_counter = 0 self.last_guide_time = 0 self.last_direction_time = 0 self.last_poi_time = 0 self.last_obstacle_time = 0 self.last_guidance_text = "" self.last_direction_text = "" self.last_valid_walkable_mask = None self.last_no_walkable_time = 0 # Day 28: "未检测到可行走区域"节流 self.last_walkable_detected_time = 0 self.last_walkable_mask = None self.last_obstacles = [] self.last_pois = [] self.prev_gray = None logger.info("[INDOOR] 导航器已重置") def process_frame(self, image: np.ndarray) -> IndoorResult: """处理单帧图像""" self.frame_counter += 1 h, w = image.shape[:2] now = time.time() frame_visualizations = [] guidance_text = "" state_info = {} # 是否执行检测 should_detect = (self.frame_counter - self.last_detection_frame) >= self.detection_interval if should_detect and self.seg_model is not None: self.last_detection_frame = self.frame_counter # 执行分割推理 walkable_mask, obstacles, pois = self._detect_all(image) # 更新缓存 self.last_walkable_mask = walkable_mask self.last_obstacles = obstacles self.last_pois = pois else: # 使用缓存 walkable_mask = self.last_walkable_mask obstacles = self.last_obstacles pois = self.last_pois # 3. 缓存有效的 mask (用于可视化防抖) walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0 if walkable_area > WALKABLE_MIN_AREA: self.last_valid_walkable_mask = walkable_mask # 4. 生成导航引导 if walkable_mask is not None: guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now) # 5. 可视化 (带持久化防抖) viz_mask = walkable_mask # 如果当前没有检测到路,但还在持久化时间内,使用缓存的 mask 进行可视化 if (viz_mask is None or walkable_area < WALKABLE_MIN_AREA) and \ (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec and \ self.last_valid_walkable_mask is not None: viz_mask = self.last_valid_walkable_mask self._add_mask_visualization(viz_mask, frame_visualizations, "walkable_mask", "rgba(0, 255, 0, 0.3)") # 障碍物可视化 for obs in obstacles: self._add_detection_visualization(obs, frame_visualizations, "obstacle") # 兴趣点可视化 for poi in pois: self._add_detection_visualization(poi, frame_visualizations, "poi") # 日志 if self.frame_counter % self.log_interval == 0: # Day 28: 修复面积计算 - 使用 count_nonzero 而不是 sum (mask 值是 0 或 255) walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0 logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | " f"障碍物={len(obstacles)} | 兴趣点={len(pois)}") # 更新状态信息 state_info = { 'frame': self.frame_counter, 'walkable_detected': walkable_mask is not None and walkable_mask.sum() > 0, 'obstacles_count': len(obstacles), 'pois_count': len(pois), } # Day 28: 移除未使用的灰度图转换 # self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Day 28: 避免每帧复制图像,直接传递原图像(下游如需可视化再复制) return IndoorResult( annotated_image=image, # 不再 copy,节省内存/CPU guidance_text=guidance_text, state_info=state_info, visualizations=frame_visualizations ) def _detect_all(self, image: np.ndarray): """执行分割检测,返回可行走区域、障碍物、兴趣点""" h, w = image.shape[:2] walkable_mask = np.zeros((h, w), dtype=np.uint8) obstacles = [] pois = [] try: imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480")) use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1" results = self.seg_model.predict( image, imgsz=imgsz, conf=CONF_THRESHOLD, verbose=False, half=use_half ) if results and len(results) > 0 and results[0].masks is not None: r0 = results[0] masks = r0.masks.data.cpu().numpy() boxes = r0.boxes for i, (mask, cls_id, conf) in enumerate(zip(masks, boxes.cls, boxes.conf)): cls_id = int(cls_id.item()) conf_val = float(conf.item()) # 过滤物品类 (默认不参与导航逻辑,避免刷屏) if cls_id in ITEM_CLASSES: continue # Day 28: 混合阈值策略 # 地面类(WALKABLE)使用全局低阈值(0.05)以提高召回率 # 障碍物(OBSTACLE/POI/BOUNDARY)使用较高阈值(0.25)以拒绝误报 filter_threshold = 0.25 if cls_id in WALKABLE_CLASSES: filter_threshold = 0.05 if conf_val < filter_threshold: continue # 调整 mask 尺寸 mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST) mask_bin = (mask_resized > 0.5).astype(np.uint8) area = int(mask_bin.sum()) # Day 28: 调试日志 - 查看检测到的类别 (ALL detections) if area > 10: # 几乎记录所有检测 cls_name = CLASS_NAMES.get(cls_id, f'unknown_{cls_id}') logger.info(f"[INDOOR DEBUG] 检测到 {cls_name}(id={cls_id}) conf={conf_val:.2f} area={area}") if area < 50: # 极端小的才过滤 continue # 可行走区域 if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA: # Day 28: 确保类型一致,避免 bitwise_or 失败 mask_add = (mask_bin * 255).astype(np.uint8) walkable_mask = cv2.bitwise_or(walkable_mask, mask_add) if area > 10000: # 调试:记录大面积添加 logger.info(f"[INDOOR DEBUG] 添加可行走区域: class={cls_id} area={area} current_total={np.count_nonzero(walkable_mask)}") # 障碍物 elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON: if area > OBSTACLE_MIN_AREA: obstacles.append({ 'class_id': cls_id, 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), 'conf': conf_val, 'mask': mask_bin, 'area': area, 'center': self._mask_center(mask_bin), }) # 兴趣点 elif cls_id in POI_CLASSES: pois.append({ 'class_id': cls_id, 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), 'conf': conf_val, 'mask': mask_bin, 'area': area, 'center': self._mask_center(mask_bin), }) except Exception as e: logger.warning(f"[INDOOR] 检测失败: {e}") return walkable_mask, obstacles, pois def _mask_center(self, mask: np.ndarray): """计算 mask 质心""" M = cv2.moments(mask) if abs(M["m00"]) < 1e-6: return None cx = int(M["m10"] / M["m00"]) cy = int(M["m01"] / M["m00"]) return (cx, cy) def _generate_guidance(self, walkable_mask, obstacles, pois, h, w, now): """生成导航引导文本""" guidance_text = "" # 1. 计算可行走区域的偏移和方向 direction_guidance = self._compute_direction_guidance(walkable_mask, h, w) # 2. 检查障碍物警告 obstacle_warning = self._check_obstacle_warning(obstacles, walkable_mask, h, w) # 3. 检查兴趣点提示 poi_hint = self._check_poi_hint(pois, h, w) # 优先级:障碍物 > 方向 > 兴趣点 if obstacle_warning and (now - self.last_obstacle_time) > OBSTACLE_INTERVAL: guidance_text = obstacle_warning self.last_obstacle_time = now self.last_guidance_text = guidance_text elif direction_guidance: # Day 28: "未检测到可行走区域" 降低播报频率 # Day 28: "未检测到可行走区域" 降低播报频率 if direction_guidance == "未检测到可行走区域": # 首次检测到(last_no_walkable_time == 0)或者间隔已过8秒 if self.last_no_walkable_time == 0 or (now - self.last_no_walkable_time) > NO_WALKABLE_INTERVAL: guidance_text = direction_guidance self.last_no_walkable_time = now # 方向引导节流 elif direction_guidance != self.last_direction_text: if (now - self.last_direction_time) > DIRECTION_INTERVAL: guidance_text = direction_guidance self.last_direction_time = now self.last_direction_text = direction_guidance elif (now - self.last_guide_time) > GUIDE_INTERVAL: # 同样的方向,降低频率 guidance_text = direction_guidance self.last_guide_time = now elif poi_hint and (now - self.last_poi_time) > POI_INTERVAL: guidance_text = poi_hint self.last_poi_time = now return guidance_text def _compute_direction_guidance(self, walkable_mask, h, w): """计算方向引导""" # Day 28: 使用 count_nonzero 替代 sum (mask 值是 0 或 255) walkable_area = np.count_nonzero(walkable_mask) if walkable_mask is not None else 0 now = time.time() if walkable_area < WALKABLE_MIN_AREA: # 缓冲逻辑:如果最近才看到过路,不要立刻报错 if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec: return None # 保持沉默,或者返回 "保持直行" (更稳妥是沉默) return "未检测到可行走区域" # 如果检测到了,更新时间戳 self.last_walkable_detected_time = now # 分析下半部分(更近的区域) lower_half = walkable_mask[int(h * 0.5):, :] if np.count_nonzero(lower_half) < 1000: if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec: return None return "前方可行走区域较小,请小心" # 计算左中右分布 third = w // 3 left_area = lower_half[:, :third].sum() center_area = lower_half[:, third:2*third].sum() right_area = lower_half[:, 2*third:].sum() total = left_area + center_area + right_area + 1e-6 left_ratio = left_area / total center_ratio = center_area / total right_ratio = right_area / total # 方向判断 if center_ratio > 0.4: return "保持直行" elif left_ratio > right_ratio * 1.5: return "向左调整" elif right_ratio > left_ratio * 1.5: return "向右调整" else: return "保持直行" def _check_obstacle_warning(self, obstacles, walkable_mask, h, w): """检查是否有障碍物在前方""" if not obstacles: return None # 定义前方区域(画面中下部) front_zone_top = int(h * 0.4) front_zone_left = int(w * 0.2) front_zone_right = int(w * 0.8) for obs in obstacles: center = obs.get('center') if center is None: continue cx, cy = center # 检查是否在前方区域 if front_zone_top < cy < h and front_zone_left < cx < front_zone_right: name_cn = obs.get('class_name_cn', '障碍物') # 判断位置 if cx < w * 0.4: return f"左前方有{name_cn}" elif cx > w * 0.6: return f"右前方有{name_cn}" else: return f"正前方有{name_cn}" return None def _check_poi_hint(self, pois, h, w): """检查兴趣点提示""" if not pois: return None for poi in pois: cls_id = poi.get('class_id') name_cn = poi.get('class_name_cn', '兴趣点') center = poi.get('center') if center is None: continue cx, cy = center # 楼梯需要特别警告 if cls_id == CLASS_STAIRS: if cy > h * 0.5: # 比较近 return f"注意前方有{name_cn}" # 门/电梯提示 elif cls_id in (CLASS_DOOR, CLASS_ELEVATOR): if cy > h * 0.3: # 在视野内 position = "左侧" if cx < w * 0.4 else ("右侧" if cx > w * 0.6 else "前方") return f"{position}有{name_cn}" return None def _add_mask_visualization(self, mask, visualizations, viz_type, color): """添加 mask 可视化""" if mask is None or mask.sum() == 0: return visualizations.append({ 'type': viz_type, 'mask': mask, 'color': color }) def _add_detection_visualization(self, detection, visualizations, det_type): """添加检测框可视化""" center = detection.get('center') if center is None: return visualizations.append({ 'type': det_type, 'center': center, 'class_name': detection.get('class_name', 'unknown'), 'class_name_cn': detection.get('class_name_cn', '未知'), 'conf': detection.get('conf', 0), })