535 lines
21 KiB
Python
535 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
室内导航工作流 (Indoor Navigation Workflow)
|
||
Day 26: 专为室内导盲模型 (yolo11l-seg-indoor14) 设计
|
||
|
||
类别映射 (14 classes from MIT Indoor):
|
||
- 可行走区域: floor(0), corridor(1), sidewalk(2)
|
||
- 静态障碍物: chair(3), table(4), sofa_bed(5), cabinet(11), trash_can(12)
|
||
- 兴趣点: door(6), elevator(7), stairs(8)
|
||
- 边界: wall(9), window(13)
|
||
- 动态障碍: person(10)
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import logging
|
||
import numpy as np
|
||
import cv2
|
||
from dataclasses import dataclass
|
||
from typing import Optional, List, Dict, Any
|
||
from collections import deque
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ========== 类别常量 (14类模型 - yolo11l-seg-indoor14) ==========
|
||
# Day 28: 使用 14 类模型 (MIT Indoor Subset)
|
||
|
||
# 可行走区域 (0-2)
|
||
WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk
|
||
CLASS_FLOOR = 0
|
||
CLASS_CORRIDOR = 1
|
||
CLASS_SIDEWALK = 2
|
||
|
||
# 静态障碍物 (3-5, 11-12)
|
||
OBSTACLE_CLASSES = {3, 4, 5, 11, 12, 13} # window 只要是障碍物也算? window(13)是墙?
|
||
# Wait, Window is 13. Is window an obstacle? Usually yes (don't walk into it).
|
||
# Cabinet 11, Trash 12.
|
||
CLASS_CHAIR = 3
|
||
CLASS_TABLE = 4
|
||
CLASS_SOFA_BED = 5
|
||
CLASS_CABINET = 11
|
||
CLASS_TRASH_CAN = 12
|
||
CLASS_WINDOW = 13 # 窗户通常视为边界或障碍
|
||
CLASS_WALL = 9 # Wall 9
|
||
|
||
# 兴趣点 (6-8)
|
||
POI_CLASSES = {6, 7, 8} # door, elevator, stairs
|
||
CLASS_DOOR = 6
|
||
CLASS_ELEVATOR = 7
|
||
CLASS_STAIRS = 8
|
||
|
||
# 动态障碍 (10)
|
||
CLASS_PERSON = 10
|
||
|
||
# 边界
|
||
BOUNDARY_CLASSES = {9, 13} # wall(9), window(13)
|
||
|
||
# 类别名称映射
|
||
CLASS_NAMES = {
|
||
0: 'floor', 1: 'corridor', 2: 'sidewalk',
|
||
3: 'chair', 4: 'table', 5: 'sofa_bed',
|
||
6: 'door', 7: 'elevator', 8: 'stairs',
|
||
9: 'wall', 10: 'person', 11: 'cabinet',
|
||
12: 'trash_can', 13: 'window'
|
||
}
|
||
|
||
# 中文名称(用于语音)
|
||
CLASS_NAMES_CN = {
|
||
0: '地面', 1: '走廊', 2: '人行道',
|
||
3: '椅子', 4: '桌子', 5: '沙发',
|
||
6: '门', 7: '电梯', 8: '楼梯',
|
||
9: '墙壁', 10: '行人', 11: '柜子',
|
||
12: '垃圾桶', 13: '窗户'
|
||
}
|
||
|
||
# 物品类 (无)
|
||
ITEM_CLASSES = set()
|
||
|
||
# ========== 配置参数 ==========
|
||
# Day 28: 进一步降低阈值以提升木地板检测率
|
||
# Day 28: 进一步降低阈值以提升木地板检测率
|
||
CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.05')) # 全局极低阈值,由后续逻辑二次过滤
|
||
WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '50')) # 极端降低最小面积以进行调试 (原 1000)
|
||
OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '300'))
|
||
|
||
# 语音间隔
|
||
GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0'))
|
||
DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5'))
|
||
POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0'))
|
||
OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0'))
|
||
# Day 28: “未检测到可行走区域”播报间隔(8秒)
|
||
NO_WALKABLE_INTERVAL = float(os.getenv('INDOOR_NO_WALKABLE_INTERVAL', '8.0'))
|
||
|
||
# ========== 可视化颜色 (BGR) ==========
|
||
VIS_COLORS = {
|
||
'walkable': (0, 255, 0), # 绿色 - 可行走
|
||
'obstacle': (0, 0, 255), # 红色 - 障碍物
|
||
'poi': (255, 255, 0), # 青色 - 兴趣点
|
||
'boundary': (128, 128, 128), # 灰色 - 边界
|
||
'person': (255, 0, 255), # 粉色 - 行人
|
||
'centerline': (255, 255, 0), # 黄色 - 引导线
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class IndoorResult:
|
||
"""室内导航结果"""
|
||
annotated_image: Optional[np.ndarray] = None
|
||
guidance_text: str = ""
|
||
state_info: Dict[str, Any] = None
|
||
visualizations: List[Dict[str, Any]] = None
|
||
|
||
def __post_init__(self):
|
||
if self.state_info is None:
|
||
self.state_info = {}
|
||
if self.visualizations is None:
|
||
self.visualizations = []
|
||
|
||
|
||
class IndoorNavigator:
|
||
"""室内导航器 - 专为室内导盲模型设计"""
|
||
|
||
def __init__(self, seg_model=None, device_id: str = "indoor"):
|
||
self.seg_model = seg_model
|
||
self.device_id = device_id
|
||
self.frame_counter = 0
|
||
|
||
# Day 28: 持久化缓冲参数
|
||
self.no_walkable_persistence_sec = 2.0
|
||
self.last_walkable_detected_time = 0
|
||
|
||
# 语音节流
|
||
self.last_guide_time = 0
|
||
self.last_direction_time = 0
|
||
self.last_poi_time = 0
|
||
self.last_obstacle_time = 0
|
||
self.last_guidance_text = ""
|
||
self.last_direction_text = ""
|
||
|
||
# 检测间隔
|
||
self.detection_interval = int(os.getenv('INDOOR_DETECTION_INTERVAL', '6'))
|
||
self.last_detection_frame = 0
|
||
|
||
# 缓存
|
||
self.last_walkable_mask = None
|
||
self.last_valid_walkable_mask = None
|
||
self.last_no_walkable_time = 0
|
||
self.last_obstacles = []
|
||
self.last_obstacles = []
|
||
self.last_pois = []
|
||
|
||
# Day 28: 移除未使用的灰度图转换 (光流功能未启用)
|
||
# self.prev_gray = None
|
||
|
||
# 日志间隔
|
||
self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30'))
|
||
|
||
logger.info(f"[INDOOR] 室内导航器初始化完成")
|
||
logger.info(f"[INDOOR] 检测间隔: 每{self.detection_interval}帧")
|
||
logger.info(f"[INDOOR] 可行走类别: {[CLASS_NAMES[c] for c in WALKABLE_CLASSES]}")
|
||
|
||
def reset(self):
|
||
"""重置状态"""
|
||
self.frame_counter = 0
|
||
self.last_guide_time = 0
|
||
self.last_direction_time = 0
|
||
self.last_poi_time = 0
|
||
self.last_obstacle_time = 0
|
||
self.last_guidance_text = ""
|
||
self.last_direction_text = ""
|
||
self.last_valid_walkable_mask = None
|
||
self.last_no_walkable_time = 0 # Day 28: "未检测到可行走区域"节流
|
||
self.last_walkable_detected_time = 0
|
||
self.last_walkable_mask = None
|
||
self.last_obstacles = []
|
||
self.last_pois = []
|
||
self.prev_gray = None
|
||
logger.info("[INDOOR] 导航器已重置")
|
||
|
||
def process_frame(self, image: np.ndarray) -> IndoorResult:
|
||
"""处理单帧图像"""
|
||
self.frame_counter += 1
|
||
h, w = image.shape[:2]
|
||
now = time.time()
|
||
|
||
frame_visualizations = []
|
||
guidance_text = ""
|
||
state_info = {}
|
||
|
||
# 是否执行检测
|
||
should_detect = (self.frame_counter - self.last_detection_frame) >= self.detection_interval
|
||
|
||
if should_detect and self.seg_model is not None:
|
||
self.last_detection_frame = self.frame_counter
|
||
|
||
# 执行分割推理
|
||
walkable_mask, obstacles, pois = self._detect_all(image)
|
||
|
||
# 更新缓存
|
||
self.last_walkable_mask = walkable_mask
|
||
self.last_obstacles = obstacles
|
||
self.last_pois = pois
|
||
else:
|
||
# 使用缓存
|
||
walkable_mask = self.last_walkable_mask
|
||
obstacles = self.last_obstacles
|
||
pois = self.last_pois
|
||
|
||
|
||
# 3. 缓存有效的 mask (用于可视化防抖)
|
||
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
|
||
if walkable_area > WALKABLE_MIN_AREA:
|
||
self.last_valid_walkable_mask = walkable_mask
|
||
|
||
# 4. 生成导航引导
|
||
if walkable_mask is not None:
|
||
guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now)
|
||
|
||
# 5. 可视化 (带持久化防抖)
|
||
viz_mask = walkable_mask
|
||
|
||
# 如果当前没有检测到路,但还在持久化时间内,使用缓存的 mask 进行可视化
|
||
if (viz_mask is None or walkable_area < WALKABLE_MIN_AREA) and \
|
||
(now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec and \
|
||
self.last_valid_walkable_mask is not None:
|
||
viz_mask = self.last_valid_walkable_mask
|
||
|
||
self._add_mask_visualization(viz_mask, frame_visualizations,
|
||
"walkable_mask", "rgba(0, 255, 0, 0.3)")
|
||
|
||
# 障碍物可视化
|
||
for obs in obstacles:
|
||
self._add_detection_visualization(obs, frame_visualizations, "obstacle")
|
||
|
||
# 兴趣点可视化
|
||
for poi in pois:
|
||
self._add_detection_visualization(poi, frame_visualizations, "poi")
|
||
|
||
# 日志
|
||
if self.frame_counter % self.log_interval == 0:
|
||
# Day 28: 修复面积计算 - 使用 count_nonzero 而不是 sum (mask 值是 0 或 255)
|
||
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
|
||
logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | "
|
||
f"障碍物={len(obstacles)} | 兴趣点={len(pois)}")
|
||
|
||
# 更新状态信息
|
||
state_info = {
|
||
'frame': self.frame_counter,
|
||
'walkable_detected': walkable_mask is not None and walkable_mask.sum() > 0,
|
||
'obstacles_count': len(obstacles),
|
||
'pois_count': len(pois),
|
||
}
|
||
|
||
# Day 28: 移除未使用的灰度图转换
|
||
# self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Day 28: 避免每帧复制图像,直接传递原图像(下游如需可视化再复制)
|
||
return IndoorResult(
|
||
annotated_image=image, # 不再 copy,节省内存/CPU
|
||
guidance_text=guidance_text,
|
||
state_info=state_info,
|
||
visualizations=frame_visualizations
|
||
)
|
||
|
||
def _detect_all(self, image: np.ndarray):
|
||
"""执行分割检测,返回可行走区域、障碍物、兴趣点"""
|
||
h, w = image.shape[:2]
|
||
walkable_mask = np.zeros((h, w), dtype=np.uint8)
|
||
obstacles = []
|
||
pois = []
|
||
|
||
try:
|
||
imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480"))
|
||
use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1"
|
||
|
||
results = self.seg_model.predict(
|
||
image,
|
||
imgsz=imgsz,
|
||
conf=CONF_THRESHOLD,
|
||
verbose=False,
|
||
half=use_half
|
||
)
|
||
|
||
if results and len(results) > 0 and results[0].masks is not None:
|
||
r0 = results[0]
|
||
masks = r0.masks.data.cpu().numpy()
|
||
boxes = r0.boxes
|
||
|
||
for i, (mask, cls_id, conf) in enumerate(zip(masks, boxes.cls, boxes.conf)):
|
||
cls_id = int(cls_id.item())
|
||
conf_val = float(conf.item())
|
||
|
||
# 过滤物品类 (默认不参与导航逻辑,避免刷屏)
|
||
if cls_id in ITEM_CLASSES:
|
||
continue
|
||
|
||
# Day 28: 混合阈值策略
|
||
# 地面类(WALKABLE)使用全局低阈值(0.05)以提高召回率
|
||
# 障碍物(OBSTACLE/POI/BOUNDARY)使用较高阈值(0.25)以拒绝误报
|
||
filter_threshold = 0.25
|
||
if cls_id in WALKABLE_CLASSES:
|
||
filter_threshold = 0.05
|
||
|
||
if conf_val < filter_threshold:
|
||
continue
|
||
|
||
# 调整 mask 尺寸
|
||
mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
|
||
mask_bin = (mask_resized > 0.5).astype(np.uint8)
|
||
area = int(mask_bin.sum())
|
||
|
||
|
||
# Day 28: 调试日志 - 查看检测到的类别 (ALL detections)
|
||
if area > 10: # 几乎记录所有检测
|
||
cls_name = CLASS_NAMES.get(cls_id, f'unknown_{cls_id}')
|
||
logger.info(f"[INDOOR DEBUG] 检测到 {cls_name}(id={cls_id}) conf={conf_val:.2f} area={area}")
|
||
|
||
if area < 50: # 极端小的才过滤
|
||
continue
|
||
|
||
# 可行走区域
|
||
if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA:
|
||
# Day 28: 确保类型一致,避免 bitwise_or 失败
|
||
mask_add = (mask_bin * 255).astype(np.uint8)
|
||
walkable_mask = cv2.bitwise_or(walkable_mask, mask_add)
|
||
if area > 10000: # 调试:记录大面积添加
|
||
logger.info(f"[INDOOR DEBUG] 添加可行走区域: class={cls_id} area={area} current_total={np.count_nonzero(walkable_mask)}")
|
||
|
||
# 障碍物
|
||
elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON:
|
||
if area > OBSTACLE_MIN_AREA:
|
||
obstacles.append({
|
||
'class_id': cls_id,
|
||
'class_name': CLASS_NAMES.get(cls_id, 'unknown'),
|
||
'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'),
|
||
'conf': conf_val,
|
||
'mask': mask_bin,
|
||
'area': area,
|
||
'center': self._mask_center(mask_bin),
|
||
})
|
||
|
||
# 兴趣点
|
||
elif cls_id in POI_CLASSES:
|
||
pois.append({
|
||
'class_id': cls_id,
|
||
'class_name': CLASS_NAMES.get(cls_id, 'unknown'),
|
||
'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'),
|
||
'conf': conf_val,
|
||
'mask': mask_bin,
|
||
'area': area,
|
||
'center': self._mask_center(mask_bin),
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[INDOOR] 检测失败: {e}")
|
||
|
||
return walkable_mask, obstacles, pois
|
||
|
||
def _mask_center(self, mask: np.ndarray):
|
||
"""计算 mask 质心"""
|
||
M = cv2.moments(mask)
|
||
if abs(M["m00"]) < 1e-6:
|
||
return None
|
||
cx = int(M["m10"] / M["m00"])
|
||
cy = int(M["m01"] / M["m00"])
|
||
return (cx, cy)
|
||
|
||
def _generate_guidance(self, walkable_mask, obstacles, pois, h, w, now):
|
||
"""生成导航引导文本"""
|
||
guidance_text = ""
|
||
|
||
# 1. 计算可行走区域的偏移和方向
|
||
direction_guidance = self._compute_direction_guidance(walkable_mask, h, w)
|
||
|
||
# 2. 检查障碍物警告
|
||
obstacle_warning = self._check_obstacle_warning(obstacles, walkable_mask, h, w)
|
||
|
||
# 3. 检查兴趣点提示
|
||
poi_hint = self._check_poi_hint(pois, h, w)
|
||
|
||
# 优先级:障碍物 > 方向 > 兴趣点
|
||
if obstacle_warning and (now - self.last_obstacle_time) > OBSTACLE_INTERVAL:
|
||
guidance_text = obstacle_warning
|
||
self.last_obstacle_time = now
|
||
self.last_guidance_text = guidance_text
|
||
elif direction_guidance:
|
||
# Day 28: "未检测到可行走区域" 降低播报频率
|
||
# Day 28: "未检测到可行走区域" 降低播报频率
|
||
if direction_guidance == "未检测到可行走区域":
|
||
# 首次检测到(last_no_walkable_time == 0)或者间隔已过8秒
|
||
if self.last_no_walkable_time == 0 or (now - self.last_no_walkable_time) > NO_WALKABLE_INTERVAL:
|
||
guidance_text = direction_guidance
|
||
self.last_no_walkable_time = now
|
||
# 方向引导节流
|
||
elif direction_guidance != self.last_direction_text:
|
||
if (now - self.last_direction_time) > DIRECTION_INTERVAL:
|
||
guidance_text = direction_guidance
|
||
self.last_direction_time = now
|
||
self.last_direction_text = direction_guidance
|
||
elif (now - self.last_guide_time) > GUIDE_INTERVAL:
|
||
# 同样的方向,降低频率
|
||
guidance_text = direction_guidance
|
||
self.last_guide_time = now
|
||
elif poi_hint and (now - self.last_poi_time) > POI_INTERVAL:
|
||
guidance_text = poi_hint
|
||
self.last_poi_time = now
|
||
|
||
return guidance_text
|
||
|
||
def _compute_direction_guidance(self, walkable_mask, h, w):
|
||
"""计算方向引导"""
|
||
# Day 28: 使用 count_nonzero 替代 sum (mask 值是 0 或 255)
|
||
walkable_area = np.count_nonzero(walkable_mask) if walkable_mask is not None else 0
|
||
now = time.time()
|
||
|
||
if walkable_area < WALKABLE_MIN_AREA:
|
||
# 缓冲逻辑:如果最近才看到过路,不要立刻报错
|
||
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
|
||
return None # 保持沉默,或者返回 "保持直行" (更稳妥是沉默)
|
||
return "未检测到可行走区域"
|
||
|
||
# 如果检测到了,更新时间戳
|
||
self.last_walkable_detected_time = now
|
||
|
||
# 分析下半部分(更近的区域)
|
||
lower_half = walkable_mask[int(h * 0.5):, :]
|
||
|
||
if np.count_nonzero(lower_half) < 1000:
|
||
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
|
||
return None
|
||
return "前方可行走区域较小,请小心"
|
||
|
||
# 计算左中右分布
|
||
third = w // 3
|
||
left_area = lower_half[:, :third].sum()
|
||
center_area = lower_half[:, third:2*third].sum()
|
||
right_area = lower_half[:, 2*third:].sum()
|
||
|
||
total = left_area + center_area + right_area + 1e-6
|
||
left_ratio = left_area / total
|
||
center_ratio = center_area / total
|
||
right_ratio = right_area / total
|
||
|
||
# 方向判断
|
||
if center_ratio > 0.4:
|
||
return "保持直行"
|
||
elif left_ratio > right_ratio * 1.5:
|
||
return "向左调整"
|
||
elif right_ratio > left_ratio * 1.5:
|
||
return "向右调整"
|
||
else:
|
||
return "保持直行"
|
||
|
||
def _check_obstacle_warning(self, obstacles, walkable_mask, h, w):
|
||
"""检查是否有障碍物在前方"""
|
||
if not obstacles:
|
||
return None
|
||
|
||
# 定义前方区域(画面中下部)
|
||
front_zone_top = int(h * 0.4)
|
||
front_zone_left = int(w * 0.2)
|
||
front_zone_right = int(w * 0.8)
|
||
|
||
for obs in obstacles:
|
||
center = obs.get('center')
|
||
if center is None:
|
||
continue
|
||
cx, cy = center
|
||
|
||
# 检查是否在前方区域
|
||
if front_zone_top < cy < h and front_zone_left < cx < front_zone_right:
|
||
name_cn = obs.get('class_name_cn', '障碍物')
|
||
|
||
# 判断位置
|
||
if cx < w * 0.4:
|
||
return f"左前方有{name_cn}"
|
||
elif cx > w * 0.6:
|
||
return f"右前方有{name_cn}"
|
||
else:
|
||
return f"正前方有{name_cn}"
|
||
|
||
return None
|
||
|
||
def _check_poi_hint(self, pois, h, w):
|
||
"""检查兴趣点提示"""
|
||
if not pois:
|
||
return None
|
||
|
||
for poi in pois:
|
||
cls_id = poi.get('class_id')
|
||
name_cn = poi.get('class_name_cn', '兴趣点')
|
||
center = poi.get('center')
|
||
|
||
if center is None:
|
||
continue
|
||
cx, cy = center
|
||
|
||
# 楼梯需要特别警告
|
||
if cls_id == CLASS_STAIRS:
|
||
if cy > h * 0.5: # 比较近
|
||
return f"注意前方有{name_cn}"
|
||
|
||
# 门/电梯提示
|
||
elif cls_id in (CLASS_DOOR, CLASS_ELEVATOR):
|
||
if cy > h * 0.3: # 在视野内
|
||
position = "左侧" if cx < w * 0.4 else ("右侧" if cx > w * 0.6 else "前方")
|
||
return f"{position}有{name_cn}"
|
||
|
||
return None
|
||
|
||
def _add_mask_visualization(self, mask, visualizations, viz_type, color):
|
||
"""添加 mask 可视化"""
|
||
if mask is None or mask.sum() == 0:
|
||
return
|
||
|
||
visualizations.append({
|
||
'type': viz_type,
|
||
'mask': mask,
|
||
'color': color
|
||
})
|
||
|
||
def _add_detection_visualization(self, detection, visualizations, det_type):
|
||
"""添加检测框可视化"""
|
||
center = detection.get('center')
|
||
if center is None:
|
||
return
|
||
|
||
visualizations.append({
|
||
'type': det_type,
|
||
'center': center,
|
||
'class_name': detection.get('class_name', 'unknown'),
|
||
'class_name_cn': detection.get('class_name_cn', '未知'),
|
||
'conf': detection.get('conf', 0),
|
||
})
|