Files
NaviGlassServer/workflow_indoor.py
2026-01-06 17:15:06 +08:00

535 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
室内导航工作流 (Indoor Navigation Workflow)
Day 26: 专为室内导盲模型 (yolo11l-seg-indoor14) 设计
类别映射 (14 classes from MIT Indoor):
- 可行走区域: floor(0), corridor(1), sidewalk(2)
- 静态障碍物: chair(3), table(4), sofa_bed(5), cabinet(11), trash_can(12)
- 兴趣点: door(6), elevator(7), stairs(8)
- 边界: wall(9), window(13)
- 动态障碍: person(10)
"""
import os
import time
import logging
import numpy as np
import cv2
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from collections import deque
logger = logging.getLogger(__name__)
# ========== 类别常量 (14类模型 - yolo11l-seg-indoor14) ==========
# Day 28: 使用 14 类模型 (MIT Indoor Subset)
# 可行走区域 (0-2)
WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk
CLASS_FLOOR = 0
CLASS_CORRIDOR = 1
CLASS_SIDEWALK = 2
# 静态障碍物 (3-5, 11-12)
OBSTACLE_CLASSES = {3, 4, 5, 11, 12, 13} # window 只要是障碍物也算? window(13)是墙?
# Wait, Window is 13. Is window an obstacle? Usually yes (don't walk into it).
# Cabinet 11, Trash 12.
CLASS_CHAIR = 3
CLASS_TABLE = 4
CLASS_SOFA_BED = 5
CLASS_CABINET = 11
CLASS_TRASH_CAN = 12
CLASS_WINDOW = 13 # 窗户通常视为边界或障碍
CLASS_WALL = 9 # Wall 9
# 兴趣点 (6-8)
POI_CLASSES = {6, 7, 8} # door, elevator, stairs
CLASS_DOOR = 6
CLASS_ELEVATOR = 7
CLASS_STAIRS = 8
# 动态障碍 (10)
CLASS_PERSON = 10
# 边界
BOUNDARY_CLASSES = {9, 13} # wall(9), window(13)
# 类别名称映射
CLASS_NAMES = {
0: 'floor', 1: 'corridor', 2: 'sidewalk',
3: 'chair', 4: 'table', 5: 'sofa_bed',
6: 'door', 7: 'elevator', 8: 'stairs',
9: 'wall', 10: 'person', 11: 'cabinet',
12: 'trash_can', 13: 'window'
}
# 中文名称(用于语音)
CLASS_NAMES_CN = {
0: '地面', 1: '走廊', 2: '人行道',
3: '椅子', 4: '桌子', 5: '沙发',
6: '', 7: '电梯', 8: '楼梯',
9: '墙壁', 10: '行人', 11: '柜子',
12: '垃圾桶', 13: '窗户'
}
# 物品类 (无)
ITEM_CLASSES = set()
# ========== 配置参数 ==========
# Day 28: 进一步降低阈值以提升木地板检测率
# Day 28: 进一步降低阈值以提升木地板检测率
CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.05')) # 全局极低阈值,由后续逻辑二次过滤
WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '50')) # 极端降低最小面积以进行调试 (原 1000)
OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '300'))
# 语音间隔
GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0'))
DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5'))
POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0'))
OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0'))
# Day 28: “未检测到可行走区域”播报间隔8秒
NO_WALKABLE_INTERVAL = float(os.getenv('INDOOR_NO_WALKABLE_INTERVAL', '8.0'))
# ========== 可视化颜色 (BGR) ==========
VIS_COLORS = {
'walkable': (0, 255, 0), # 绿色 - 可行走
'obstacle': (0, 0, 255), # 红色 - 障碍物
'poi': (255, 255, 0), # 青色 - 兴趣点
'boundary': (128, 128, 128), # 灰色 - 边界
'person': (255, 0, 255), # 粉色 - 行人
'centerline': (255, 255, 0), # 黄色 - 引导线
}
@dataclass
class IndoorResult:
"""室内导航结果"""
annotated_image: Optional[np.ndarray] = None
guidance_text: str = ""
state_info: Dict[str, Any] = None
visualizations: List[Dict[str, Any]] = None
def __post_init__(self):
if self.state_info is None:
self.state_info = {}
if self.visualizations is None:
self.visualizations = []
class IndoorNavigator:
"""室内导航器 - 专为室内导盲模型设计"""
def __init__(self, seg_model=None, device_id: str = "indoor"):
self.seg_model = seg_model
self.device_id = device_id
self.frame_counter = 0
# Day 28: 持久化缓冲参数
self.no_walkable_persistence_sec = 2.0
self.last_walkable_detected_time = 0
# 语音节流
self.last_guide_time = 0
self.last_direction_time = 0
self.last_poi_time = 0
self.last_obstacle_time = 0
self.last_guidance_text = ""
self.last_direction_text = ""
# 检测间隔
self.detection_interval = int(os.getenv('INDOOR_DETECTION_INTERVAL', '6'))
self.last_detection_frame = 0
# 缓存
self.last_walkable_mask = None
self.last_valid_walkable_mask = None
self.last_no_walkable_time = 0
self.last_obstacles = []
self.last_obstacles = []
self.last_pois = []
# Day 28: 移除未使用的灰度图转换 (光流功能未启用)
# self.prev_gray = None
# 日志间隔
self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30'))
logger.info(f"[INDOOR] 室内导航器初始化完成")
logger.info(f"[INDOOR] 检测间隔: 每{self.detection_interval}")
logger.info(f"[INDOOR] 可行走类别: {[CLASS_NAMES[c] for c in WALKABLE_CLASSES]}")
def reset(self):
"""重置状态"""
self.frame_counter = 0
self.last_guide_time = 0
self.last_direction_time = 0
self.last_poi_time = 0
self.last_obstacle_time = 0
self.last_guidance_text = ""
self.last_direction_text = ""
self.last_valid_walkable_mask = None
self.last_no_walkable_time = 0 # Day 28: "未检测到可行走区域"节流
self.last_walkable_detected_time = 0
self.last_walkable_mask = None
self.last_obstacles = []
self.last_pois = []
self.prev_gray = None
logger.info("[INDOOR] 导航器已重置")
def process_frame(self, image: np.ndarray) -> IndoorResult:
"""处理单帧图像"""
self.frame_counter += 1
h, w = image.shape[:2]
now = time.time()
frame_visualizations = []
guidance_text = ""
state_info = {}
# 是否执行检测
should_detect = (self.frame_counter - self.last_detection_frame) >= self.detection_interval
if should_detect and self.seg_model is not None:
self.last_detection_frame = self.frame_counter
# 执行分割推理
walkable_mask, obstacles, pois = self._detect_all(image)
# 更新缓存
self.last_walkable_mask = walkable_mask
self.last_obstacles = obstacles
self.last_pois = pois
else:
# 使用缓存
walkable_mask = self.last_walkable_mask
obstacles = self.last_obstacles
pois = self.last_pois
# 3. 缓存有效的 mask (用于可视化防抖)
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
if walkable_area > WALKABLE_MIN_AREA:
self.last_valid_walkable_mask = walkable_mask
# 4. 生成导航引导
if walkable_mask is not None:
guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now)
# 5. 可视化 (带持久化防抖)
viz_mask = walkable_mask
# 如果当前没有检测到路,但还在持久化时间内,使用缓存的 mask 进行可视化
if (viz_mask is None or walkable_area < WALKABLE_MIN_AREA) and \
(now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec and \
self.last_valid_walkable_mask is not None:
viz_mask = self.last_valid_walkable_mask
self._add_mask_visualization(viz_mask, frame_visualizations,
"walkable_mask", "rgba(0, 255, 0, 0.3)")
# 障碍物可视化
for obs in obstacles:
self._add_detection_visualization(obs, frame_visualizations, "obstacle")
# 兴趣点可视化
for poi in pois:
self._add_detection_visualization(poi, frame_visualizations, "poi")
# 日志
if self.frame_counter % self.log_interval == 0:
# Day 28: 修复面积计算 - 使用 count_nonzero 而不是 sum (mask 值是 0 或 255)
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | "
f"障碍物={len(obstacles)} | 兴趣点={len(pois)}")
# 更新状态信息
state_info = {
'frame': self.frame_counter,
'walkable_detected': walkable_mask is not None and walkable_mask.sum() > 0,
'obstacles_count': len(obstacles),
'pois_count': len(pois),
}
# Day 28: 移除未使用的灰度图转换
# self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Day 28: 避免每帧复制图像,直接传递原图像(下游如需可视化再复制)
return IndoorResult(
annotated_image=image, # 不再 copy节省内存/CPU
guidance_text=guidance_text,
state_info=state_info,
visualizations=frame_visualizations
)
def _detect_all(self, image: np.ndarray):
"""执行分割检测,返回可行走区域、障碍物、兴趣点"""
h, w = image.shape[:2]
walkable_mask = np.zeros((h, w), dtype=np.uint8)
obstacles = []
pois = []
try:
imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480"))
use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1"
results = self.seg_model.predict(
image,
imgsz=imgsz,
conf=CONF_THRESHOLD,
verbose=False,
half=use_half
)
if results and len(results) > 0 and results[0].masks is not None:
r0 = results[0]
masks = r0.masks.data.cpu().numpy()
boxes = r0.boxes
for i, (mask, cls_id, conf) in enumerate(zip(masks, boxes.cls, boxes.conf)):
cls_id = int(cls_id.item())
conf_val = float(conf.item())
# 过滤物品类 (默认不参与导航逻辑,避免刷屏)
if cls_id in ITEM_CLASSES:
continue
# Day 28: 混合阈值策略
# 地面类(WALKABLE)使用全局低阈值(0.05)以提高召回率
# 障碍物(OBSTACLE/POI/BOUNDARY)使用较高阈值(0.25)以拒绝误报
filter_threshold = 0.25
if cls_id in WALKABLE_CLASSES:
filter_threshold = 0.05
if conf_val < filter_threshold:
continue
# 调整 mask 尺寸
mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
mask_bin = (mask_resized > 0.5).astype(np.uint8)
area = int(mask_bin.sum())
# Day 28: 调试日志 - 查看检测到的类别 (ALL detections)
if area > 10: # 几乎记录所有检测
cls_name = CLASS_NAMES.get(cls_id, f'unknown_{cls_id}')
logger.info(f"[INDOOR DEBUG] 检测到 {cls_name}(id={cls_id}) conf={conf_val:.2f} area={area}")
if area < 50: # 极端小的才过滤
continue
# 可行走区域
if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA:
# Day 28: 确保类型一致,避免 bitwise_or 失败
mask_add = (mask_bin * 255).astype(np.uint8)
walkable_mask = cv2.bitwise_or(walkable_mask, mask_add)
if area > 10000: # 调试:记录大面积添加
logger.info(f"[INDOOR DEBUG] 添加可行走区域: class={cls_id} area={area} current_total={np.count_nonzero(walkable_mask)}")
# 障碍物
elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON:
if area > OBSTACLE_MIN_AREA:
obstacles.append({
'class_id': cls_id,
'class_name': CLASS_NAMES.get(cls_id, 'unknown'),
'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'),
'conf': conf_val,
'mask': mask_bin,
'area': area,
'center': self._mask_center(mask_bin),
})
# 兴趣点
elif cls_id in POI_CLASSES:
pois.append({
'class_id': cls_id,
'class_name': CLASS_NAMES.get(cls_id, 'unknown'),
'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'),
'conf': conf_val,
'mask': mask_bin,
'area': area,
'center': self._mask_center(mask_bin),
})
except Exception as e:
logger.warning(f"[INDOOR] 检测失败: {e}")
return walkable_mask, obstacles, pois
def _mask_center(self, mask: np.ndarray):
"""计算 mask 质心"""
M = cv2.moments(mask)
if abs(M["m00"]) < 1e-6:
return None
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
return (cx, cy)
def _generate_guidance(self, walkable_mask, obstacles, pois, h, w, now):
"""生成导航引导文本"""
guidance_text = ""
# 1. 计算可行走区域的偏移和方向
direction_guidance = self._compute_direction_guidance(walkable_mask, h, w)
# 2. 检查障碍物警告
obstacle_warning = self._check_obstacle_warning(obstacles, walkable_mask, h, w)
# 3. 检查兴趣点提示
poi_hint = self._check_poi_hint(pois, h, w)
# 优先级:障碍物 > 方向 > 兴趣点
if obstacle_warning and (now - self.last_obstacle_time) > OBSTACLE_INTERVAL:
guidance_text = obstacle_warning
self.last_obstacle_time = now
self.last_guidance_text = guidance_text
elif direction_guidance:
# Day 28: "未检测到可行走区域" 降低播报频率
# Day 28: "未检测到可行走区域" 降低播报频率
if direction_guidance == "未检测到可行走区域":
# 首次检测到last_no_walkable_time == 0或者间隔已过8秒
if self.last_no_walkable_time == 0 or (now - self.last_no_walkable_time) > NO_WALKABLE_INTERVAL:
guidance_text = direction_guidance
self.last_no_walkable_time = now
# 方向引导节流
elif direction_guidance != self.last_direction_text:
if (now - self.last_direction_time) > DIRECTION_INTERVAL:
guidance_text = direction_guidance
self.last_direction_time = now
self.last_direction_text = direction_guidance
elif (now - self.last_guide_time) > GUIDE_INTERVAL:
# 同样的方向,降低频率
guidance_text = direction_guidance
self.last_guide_time = now
elif poi_hint and (now - self.last_poi_time) > POI_INTERVAL:
guidance_text = poi_hint
self.last_poi_time = now
return guidance_text
def _compute_direction_guidance(self, walkable_mask, h, w):
"""计算方向引导"""
# Day 28: 使用 count_nonzero 替代 sum (mask 值是 0 或 255)
walkable_area = np.count_nonzero(walkable_mask) if walkable_mask is not None else 0
now = time.time()
if walkable_area < WALKABLE_MIN_AREA:
# 缓冲逻辑:如果最近才看到过路,不要立刻报错
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
return None # 保持沉默,或者返回 "保持直行" (更稳妥是沉默)
return "未检测到可行走区域"
# 如果检测到了,更新时间戳
self.last_walkable_detected_time = now
# 分析下半部分(更近的区域)
lower_half = walkable_mask[int(h * 0.5):, :]
if np.count_nonzero(lower_half) < 1000:
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
return None
return "前方可行走区域较小,请小心"
# 计算左中右分布
third = w // 3
left_area = lower_half[:, :third].sum()
center_area = lower_half[:, third:2*third].sum()
right_area = lower_half[:, 2*third:].sum()
total = left_area + center_area + right_area + 1e-6
left_ratio = left_area / total
center_ratio = center_area / total
right_ratio = right_area / total
# 方向判断
if center_ratio > 0.4:
return "保持直行"
elif left_ratio > right_ratio * 1.5:
return "向左调整"
elif right_ratio > left_ratio * 1.5:
return "向右调整"
else:
return "保持直行"
def _check_obstacle_warning(self, obstacles, walkable_mask, h, w):
"""检查是否有障碍物在前方"""
if not obstacles:
return None
# 定义前方区域(画面中下部)
front_zone_top = int(h * 0.4)
front_zone_left = int(w * 0.2)
front_zone_right = int(w * 0.8)
for obs in obstacles:
center = obs.get('center')
if center is None:
continue
cx, cy = center
# 检查是否在前方区域
if front_zone_top < cy < h and front_zone_left < cx < front_zone_right:
name_cn = obs.get('class_name_cn', '障碍物')
# 判断位置
if cx < w * 0.4:
return f"左前方有{name_cn}"
elif cx > w * 0.6:
return f"右前方有{name_cn}"
else:
return f"正前方有{name_cn}"
return None
def _check_poi_hint(self, pois, h, w):
"""检查兴趣点提示"""
if not pois:
return None
for poi in pois:
cls_id = poi.get('class_id')
name_cn = poi.get('class_name_cn', '兴趣点')
center = poi.get('center')
if center is None:
continue
cx, cy = center
# 楼梯需要特别警告
if cls_id == CLASS_STAIRS:
if cy > h * 0.5: # 比较近
return f"注意前方有{name_cn}"
# 门/电梯提示
elif cls_id in (CLASS_DOOR, CLASS_ELEVATOR):
if cy > h * 0.3: # 在视野内
position = "左侧" if cx < w * 0.4 else ("右侧" if cx > w * 0.6 else "前方")
return f"{position}{name_cn}"
return None
def _add_mask_visualization(self, mask, visualizations, viz_type, color):
"""添加 mask 可视化"""
if mask is None or mask.sum() == 0:
return
visualizations.append({
'type': viz_type,
'mask': mask,
'color': color
})
def _add_detection_visualization(self, detection, visualizations, det_type):
"""添加检测框可视化"""
center = detection.get('center')
if center is None:
return
visualizations.append({
'type': det_type,
'center': center,
'class_name': detection.get('class_name', 'unknown'),
'class_name_cn': detection.get('class_name_cn', '未知'),
'conf': detection.get('conf', 0),
})