4 Commits

Author SHA1 Message Date
Kevin Wong
a7f98c3893 代码优化 2026-01-06 17:15:06 +08:00
Kevin Wong
fbc5cf49d8 修改缩进错误 2026-01-06 10:47:41 +08:00
Kevin Wong
b336692144 室内导盲增加数据集 2026-01-05 17:55:17 +08:00
Kevin Wong
122c07b28e 更新室内导盲代码 2026-01-05 14:39:16 +08:00
11 changed files with 281 additions and 575 deletions

View File

@@ -1,90 +0,0 @@
# 更新日志
本文档记录项目的所有重要变更。
格式基于 [Keep a Changelog](https://keepachangelog.com/zh-CN/1.0.0/)
版本号遵循 [语义化版本](https://semver.org/lang/zh-CN/)。
## [未发布]
### 新增
- 首次开源发布
- 完整的 GitHub 文档README, CONTRIBUTING, LICENSE 等)
- Docker 支持
- 环境变量配置模板
### 修改
- 优化了 README 文档结构
- 改进了代码注释
## [1.0.0] - 2025-01-XX
### 新增
- 🚶 盲道导航系统
- 实时盲道检测与分割
- 智能语音引导
- 障碍物检测与避障
- 急转弯检测与提醒
- 光流稳定算法
- 🚦 过马路辅助
- 斑马线识别与方向检测
- 红绿灯颜色识别
- 对齐引导系统
- 安全提醒
- 🔍 物品识别与查找
- YOLO-E 开放词汇检测
- MediaPipe 手部引导
- 实时目标追踪
- 抓取动作检测
- 🎙️ 实时语音交互
- 阿里云 Paraformer ASR
- Qwen-Omni-Turbo 多模态对话
- 智能指令解析
- 上下文感知
- 📹 视频与音频处理
- WebSocket 实时推流
- 音视频同步录制
- IMU 数据融合
- 多路音频混音
- 🎨 可视化与交互
- Web 实时监控界面
- IMU 3D 可视化
- 状态面板
- 中文友好界面
### 技术栈
- FastAPI + WebSocket
- YOLO11 / YOLO-E
- MediaPipe
- PyTorch + CUDA
- OpenCV
- DashScope API
### 已知问题
- [ ] 在低端 GPU 上可能出现卡顿
- [ ] macOS 上缺少 GPU 加速支持
- [ ] 部分中文字体在 Linux 上显示不正确
---
## 版本说明
### 主版本Major
- 不兼容的 API 更改
### 次版本Minor
- 向后兼容的新功能
### 修订版本Patch
- 向后兼容的问题修复
---
[未发布]: https://github.com/yourusername/aiglass/compare/v1.0.0...HEAD
[1.0.0]: https://github.com/yourusername/aiglass/releases/tag/v1.0.0

View File

@@ -1,402 +0,0 @@
# 项目结构说明
本文档详细说明项目的目录结构和主要文件的作用。
## 📁 目录结构
```
rebuild1002/
├── 📄 主要应用文件
│ ├── app_main.py # 主应用入口FastAPI 服务)
│ ├── navigation_master.py # 导航统领器(状态机)
│ ├── workflow_blindpath.py # 盲道导航工作流
│ ├── workflow_crossstreet.py # 过马路导航工作流
│ └── yolomedia.py # 物品查找工作流
├── 🎙️ 语音处理模块
│ ├── asr_core.py # 语音识别核心
│ ├── omni_client.py # Qwen-Omni 客户端
│ ├── qwen_extractor.py # 标签提取(中文->英文)
│ ├── audio_player.py # 音频播放器
│ └── audio_stream.py # 音频流管理
├── 🤖 模型相关
│ ├── yoloe_backend.py # YOLO-E 后端(开放词汇)
│ ├── trafficlight_detection.py # 红绿灯检测
│ ├── obstacle_detector_client.py # 障碍物检测客户端
│ └── models.py # 模型定义
├── 🎥 视频处理
│ ├── bridge_io.py # 线程安全的帧缓冲
│ ├── sync_recorder.py # 音视频同步录制
│ └── video_recorder.py # 视频录制(旧版)
├── 🌐 Web 前端
│ ├── templates/
│ │ └── index.html # 主界面 HTML
│ ├── static/
│ │ ├── main.js # 主 JS 脚本
│ │ ├── vision.js # 视觉流处理
│ │ ├── visualizer.js # 数据可视化
│ │ ├── vision_renderer.js # 渲染器
│ │ ├── vision.css # 样式表
│ │ └── models/ # 3D 模型IMU 可视化)
├── 🎵 音频资源
│ ├── music/ # 系统提示音
│ │ ├── converted_向上.wav
│ │ ├── converted_向下.wav
│ │ └── ...
│ └── voice/ # 预录语音
│ ├── voice_mapping.json
│ └── *.wav
├── 🧠 模型文件
│ └── model/
│ ├── yolo-seg.pt # 盲道分割模型
│ ├── yoloe-11l-seg.pt # YOLO-E 开放词汇模型
│ ├── shoppingbest5.pt # 物品识别模型
│ ├── trafficlight.pt # 红绿灯检测模型
│ └── hand_landmarker.task # MediaPipe 手部模型
├── 📹 录制文件
│ └── recordings/ # 自动保存的视频和音频
│ ├── video_*.avi
│ └── audio_*.wav
├── 🛠️ ESP32 固件
│ └── compile/
│ ├── compile.ino # Arduino 主程序
│ ├── camera_pins.h # 摄像头引脚定义
│ ├── ICM42688.cpp/h # IMU 驱动
│ └── ESP32_VIDEO_OPTIMIZATION.md
├── 🧪 测试文件
│ ├── test_recorder.py # 录制功能测试
│ ├── test_traffic_light.py # 红绿灯检测测试
│ ├── test_cross_street_blindpath.py # 导航测试
│ └── test_crosswalk_awareness.py # 斑马线检测测试
├── 📚 文档
│ ├── README.md # 项目主文档
│ ├── INSTALLATION.md # 安装指南
│ ├── CONTRIBUTING.md # 贡献指南
│ ├── FAQ.md # 常见问题
│ ├── CHANGELOG.md # 更新日志
│ ├── SECURITY.md # 安全政策
│ └── PROJECT_STRUCTURE.md # 本文件
├── 🐳 Docker 相关
│ ├── Dockerfile # Docker 镜像定义
│ ├── docker-compose.yml # Docker Compose 配置
│ └── .dockerignore # Docker 忽略文件
├── ⚙️ 配置文件
│ ├── .env.example # 环境变量模板
│ ├── .gitignore # Git 忽略文件
│ ├── requirements.txt # Python 依赖
│ ├── setup.sh # Linux/macOS 安装脚本
│ └── setup.bat # Windows 安装脚本
├── 📄 许可证
│ └── LICENSE # MIT 许可证
└── 🔧 GitHub 相关
└── .github/
├── ISSUE_TEMPLATE/
│ ├── bug_report.md
│ └── feature_request.md
└── pull_request_template.md
```
## 🔑 核心文件说明
### 主应用层
#### `app_main.py`
- **作用**: FastAPI 主服务,处理所有 WebSocket 连接
- **主要功能**:
- WebSocket 路由管理(/ws/camera, /ws_audio, /ws/viewer 等)
- 模型加载与初始化
- 状态协调与管理
- 音视频流分发
- **依赖**: 所有其他模块
- **入口点**: `python app_main.py`
#### `navigation_master.py`
- **作用**: 导航统领器,管理整个系统的状态机
- **主要状态**:
- IDLE: 空闲
- CHAT: 对话模式
- BLINDPATH_NAV: 盲道导航
- CROSSING: 过马路
- TRAFFIC_LIGHT_DETECTION: 红绿灯检测
- ITEM_SEARCH: 物品查找
- **核心方法**:
- `process_frame()`: 处理每一帧
- `start_blind_path_navigation()`: 启动盲道导航
- `start_crossing()`: 启动过马路模式
- `on_voice_command()`: 处理语音命令
### 工作流模块
#### `workflow_blindpath.py`
- **作用**: 盲道导航核心逻辑
- **主要功能**:
- 盲道分割与检测
- 障碍物检测
- 转弯检测
- 光流稳定
- 方向引导生成
- **状态机**:
- ONBOARDING: 上盲道
- NAVIGATING: 导航中
- MANEUVERING_TURN: 转弯
- AVOIDING_OBSTACLE: 避障
#### `workflow_crossstreet.py`
- **作用**: 过马路导航逻辑
- **主要功能**:
- 斑马线检测
- 方向对齐
- 引导生成
- **核心方法**:
- `_is_crosswalk_near()`: 判断是否接近斑马线
- `_compute_angle_and_offset()`: 计算角度和偏移
#### `yolomedia.py`
- **作用**: 物品查找工作流
- **主要功能**:
- YOLO-E 文本提示检测
- MediaPipe 手部追踪
- 光流目标追踪
- 手部引导(方向提示)
- 抓取动作检测
- **模式**:
- SEGMENT: 检测模式
- FLASH: 闪烁确认
- CENTER_GUIDE: 居中引导
- TRACK: 手部追踪
### 语音模块
#### `asr_core.py`
- **作用**: 阿里云 Paraformer ASR 实时语音识别
- **主要功能**:
- 实时语音识别
- VAD语音活动检测
- 识别结果回调
- **关键类**: `ASRCallback`
#### `omni_client.py`
- **作用**: Qwen-Omni-Turbo 多模态对话客户端
- **主要功能**:
- 流式对话生成
- 图像+文本输入
- 语音输出
- **核心函数**: `stream_chat()`
#### `audio_player.py`
- **作用**: 统一的音频播放管理
- **主要功能**:
- TTS 语音播放
- 多路音频混音
- 音量控制
- 线程安全播放
- **核心函数**: `play_voice_text()`, `play_audio_threadsafe()`
### 模型后端
#### `yoloe_backend.py`
- **作用**: YOLO-E 开放词汇检测后端
- **主要功能**:
- 文本提示设置
- 实时分割
- 目标追踪
- **核心类**: `YoloEBackend`
#### `trafficlight_detection.py`
- **作用**: 红绿灯检测模块
- **检测方法**:
1. YOLO 模型检测
2. HSV 颜色分类(备用)
- **输出**: 红灯/绿灯/黄灯/未知
#### `obstacle_detector_client.py`
- **作用**: 障碍物检测客户端
- **主要功能**:
- 白名单类别过滤
- 路径掩码内检测
- 物体属性计算(面积、位置、危险度)
### 视频处理
#### `bridge_io.py`
- **作用**: 线程安全的帧缓冲与分发
- **主要功能**:
- 生产者-消费者模式
- 原始帧缓存
- 处理后帧分发
- **核心函数**:
- `push_raw_jpeg()`: 接收 ESP32 帧
- `wait_raw_bgr()`: 取原始帧
- `send_vis_bgr()`: 发送处理后的帧
#### `sync_recorder.py`
- **作用**: 音视频同步录制
- **主要功能**:
- 同步录制视频和音频
- 自动文件命名(时间戳)
- 线程安全
- **输出**: `recordings/video_*.avi`, `audio_*.wav`
### 前端
#### `templates/index.html`
- **作用**: Web 监控界面
- **主要区域**:
- 视频流显示
- 状态面板
- IMU 3D 可视化
- 语音识别结果
#### `static/main.js`
- **作用**: 主 JavaScript 逻辑
- **主要功能**:
- WebSocket 连接管理
- UI 更新
- 事件处理
#### `static/vision.js`
- **作用**: 视觉流处理
- **主要功能**:
- WebSocket 接收视频帧
- Canvas 渲染
- FPS 计算
#### `static/visualizer.js`
- **作用**: IMU 3D 可视化Three.js
- **主要功能**:
- 接收 IMU 数据
- 实时渲染设备姿态
- 动态灯光效果
## 🔄 数据流
### 视频流
```
ESP32-CAM
→ [JPEG] WebSocket /ws/camera
→ bridge_io.push_raw_jpeg()
→ yolomedia / navigation_master
→ bridge_io.send_vis_bgr()
→ [JPEG] WebSocket /ws/viewer
→ Browser Canvas
```
### 音频流(上行)
```
ESP32-MIC
→ [PCM16] WebSocket /ws_audio
→ asr_core
→ DashScope ASR
→ 识别结果
→ start_ai_with_text_custom()
```
### 音频流(下行)
```
Qwen-Omni / TTS
→ audio_player
→ [PCM16] audio_stream
→ [WAV] HTTP /stream.wav
→ ESP32-Speaker
```
### IMU 数据流
```
ESP32-IMU
→ [JSON] UDP 12345
→ process_imu_and_maybe_store()
→ [JSON] WebSocket /ws
→ visualizer.js (Three.js)
```
## 🎯 关键设计模式
### 1. 状态机模式
- **位置**: `navigation_master.py`
- **作用**: 管理系统状态转换
- **状态**: IDLE → CHAT / BLINDPATH_NAV / CROSSING / ...
### 2. 生产者-消费者模式
- **位置**: `bridge_io.py`
- **作用**: 解耦视频接收与处理
- **实现**: 线程 + 队列
### 3. 策略模式
- **位置**: 各 `workflow_*.py`
- **作用**: 不同导航策略的实现
- **实现**: 统一的 `process_frame()` 接口
### 4. 单例模式
- **位置**: 模型加载
- **作用**: 全局共享模型实例
- **实现**: 全局变量 + 初始化检查
### 5. 观察者模式
- **位置**: WebSocket 通信
- **作用**: 多客户端订阅视频流
- **实现**: `camera_viewers: Set[WebSocket]`
## 📦 依赖关系
```
app_main.py
├── navigation_master.py
│ ├── workflow_blindpath.py
│ │ ├── yoloe_backend.py
│ │ └── obstacle_detector_client.py
│ ├── workflow_crossstreet.py
│ └── trafficlight_detection.py
├── yolomedia.py
│ └── yoloe_backend.py
├── asr_core.py
├── omni_client.py
├── audio_player.py
├── audio_stream.py
├── bridge_io.py
└── sync_recorder.py
```
## 🚀 启动流程
1. **初始化阶段** (`app_main.py`)
- 加载环境变量
- 加载导航模型YOLO、MediaPipe
- 初始化音频系统
- 启动录制系统
- 预加载红绿灯模型
2. **服务启动** (FastAPI)
- 注册 WebSocket 路由
- 挂载静态文件
- 启动 UDP 监听IMU
- 启动 HTTP 服务8081 端口)
3. **运行阶段**
- 等待 ESP32 连接
- 接收视频/音频/IMU 数据
- 处理用户语音指令
- 实时推送处理结果
4. **关闭阶段**
- 停止录制(保存文件)
- 关闭所有 WebSocket 连接
- 释放模型资源
- 清理临时文件
---
**提示**: 如需了解某个模块的详细实现,请查看相应源文件的注释和 docstring。

View File

@@ -109,7 +109,7 @@ pip install -r requirements.txt
|------|------| |------|------|
| `yolo-seg.pt` | 盲道分割 | | `yolo-seg.pt` | 盲道分割 |
| `yoloe-11l-seg.pt` | 障碍物/开放词汇检测 | | `yoloe-11l-seg.pt` | 障碍物/开放词汇检测 |
| `yolo11l-seg-indoor14.pt` | 室内导盲 (14类) | | `yolo11l-seg-indoor.engine` | 室内导盲 (20类) |
| `SenseVoiceSmall/` | 语音识别 | | `SenseVoiceSmall/` | 语音识别 |
> 模型下载: https://www.modelscope.cn/models/archifancy/AIGlasses_for_navigation > 模型下载: https://www.modelscope.cn/models/archifancy/AIGlasses_for_navigation

View File

@@ -224,22 +224,26 @@ async def lifespan(app: FastAPI):
# 4. Day 21: 预加载新 AI 管道模型(避免首次使用时延迟) # 4. Day 21: 预加载新 AI 管道模型(避免首次使用时延迟)
if USE_NEW_AI_PIPELINE: if USE_NEW_AI_PIPELINE:
async def _preload_models(): # Day 28: VAD 同步预加载,避免第一句话不识别
try:
print("[PRELOAD] 预加载 Silero VAD...")
from server_vad import get_vad_model
get_vad_model() # 直接加载 VAD 模型
print("[PRELOAD] Silero VAD 预加载完成")
except Exception as e:
print(f"[PRELOAD] VAD 预加载失败: {e}")
# SenseVoice 异步加载(不阻塞启动)
async def _preload_sensevoice():
try: try:
print("[PRELOAD] 预加载 Silero VAD...")
from server_vad import get_server_vad
get_server_vad() # 触发 VAD 模型加载
print("[PRELOAD] 预加载 SenseVoice ASR...") print("[PRELOAD] 预加载 SenseVoice ASR...")
from sensevoice_asr import init_sensevoice from sensevoice_asr import init_sensevoice
await init_sensevoice() # 异步加载 ASR 模型 await init_sensevoice()
print("[PRELOAD] 新 AI 管道模型预加载完成") print("[PRELOAD] 新 AI 管道模型预加载完成")
except Exception as e: except Exception as e:
print(f"[PRELOAD] 模型预加载失败: {e}") print(f"[PRELOAD] SenseVoice 预加载失败: {e}")
# 后台预加载,不阻塞启动 asyncio.create_task(_preload_sensevoice())
asyncio.create_task(_preload_models())
print("[LIFESPAN] 应用启动完成") print("[LIFESPAN] 应用启动完成")
@@ -349,7 +353,9 @@ def load_navigation_models():
# global yolo_seg_model, obstacle_detector (Moved to ctx) # global yolo_seg_model, obstacle_detector (Moved to ctx)
try: try:
seg_model_path = os.getenv("BLIND_PATH_MODEL", "model/yolo-seg.pt") # 使用基于当前文件的绝对路径
default_seg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model", "yolo-seg.pt")
seg_model_path = os.getenv("BLIND_PATH_MODEL", default_seg_path)
# Day 20: 优先使用 TensorRT 引擎 # Day 20: 优先使用 TensorRT 引擎
seg_model_path = get_best_model_path(seg_model_path) seg_model_path = get_best_model_path(seg_model_path)
#print(f"[NAVIGATION] 尝试加载模型: {seg_model_path}") #print(f"[NAVIGATION] 尝试加载模型: {seg_model_path}")
@@ -401,7 +407,8 @@ def load_navigation_models():
print(f"[NAVIGATION] 请检查文件路径是否正确") print(f"[NAVIGATION] 请检查文件路径是否正确")
# 【修改开始】使用 ObstacleDetectorClient 替代直接的 YOLO # 【修改开始】使用 ObstacleDetectorClient 替代直接的 YOLO
obstacle_model_path = os.getenv("OBSTACLE_MODEL", "model/yoloe-11l-seg.pt") default_obs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model", "yoloe-11l-seg.pt")
obstacle_model_path = os.getenv("OBSTACLE_MODEL", default_obs_path)
# Day 20: 优先使用 TensorRT 引擎 # Day 20: 优先使用 TensorRT 引擎
obstacle_model_path = get_best_model_path(obstacle_model_path) obstacle_model_path = get_best_model_path(obstacle_model_path)
print(f"[NAVIGATION] 尝试加载障碍物检测模型: {obstacle_model_path}") print(f"[NAVIGATION] 尝试加载障碍物检测模型: {obstacle_model_path}")
@@ -483,7 +490,10 @@ def load_indoor_model():
from model_utils import is_tensorrt_engine # Imported here for usage from model_utils import is_tensorrt_engine # Imported here for usage
try: try:
indoor_model_path = os.getenv("INDOOR_MODEL", "model/yolo11l-seg-indoor14.pt") # Day 28: 使用新训练的 14 类模型 (用户请求切换)
# 使用基于当前文件的绝对路径,确保在服务器任意目录启动都能找到模型
default_model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model", "yolo11l-seg-indoor14.engine")
indoor_model_path = os.getenv("INDOOR_MODEL", default_model_path)
# 优先使用 TensorRT 引擎 # 优先使用 TensorRT 引擎
indoor_model_path = get_best_model_path(indoor_model_path) indoor_model_path = get_best_model_path(indoor_model_path)
print(f"[INDOOR] 尝试加载室内导盲模型: {indoor_model_path}") print(f"[INDOOR] 尝试加载室内导盲模型: {indoor_model_path}")
@@ -751,7 +761,8 @@ async def start_ai_with_text_custom(user_text: str):
if ctx.orchestrator: if ctx.orchestrator:
current_state = ctx.orchestrator.get_state() current_state = ctx.orchestrator.get_state()
# 如果在导航模式或红绿灯检测模式非CHAT模式 # 如果在导航模式或红绿灯检测模式非CHAT模式
if current_state not in ["CHAT", "IDLE"]: # Day 28: 允许 INDOOR_NAV 模式下进行对话,但其他模式(盲道、过马路)依然严格屏蔽
if current_state not in ["CHAT", "IDLE", "INDOOR_NAV"]:
# 检查是否是允许的对话触发词 # 检查是否是允许的对话触发词
allowed_keywords = ["帮我看", "帮我看下", "帮我找", "找一下", "看看", "识别一下"] allowed_keywords = ["帮我看", "帮我看下", "帮我找", "找一下", "看看", "识别一下"]
is_allowed_query = any(keyword in user_text for keyword in allowed_keywords) is_allowed_query = any(keyword in user_text for keyword in allowed_keywords)
@@ -759,7 +770,9 @@ async def start_ai_with_text_custom(user_text: str):
# 检查是否是导航控制命令 # 检查是否是导航控制命令
nav_control_keywords = ["开始过马路", "过马路结束", "开始导航", "盲道导航", "停止导航", "结束导航", nav_control_keywords = ["开始过马路", "过马路结束", "开始导航", "盲道导航", "停止导航", "结束导航",
"检测红绿灯", "看红绿灯", "停止检测", "停止红绿灯", "检测红绿灯", "看红绿灯", "停止检测", "停止红绿灯",
"室内导航", "室内导盲"] # 新增室内导航 "室内导航", "室内导盲", "四内导航", "思维导航", "失内导航", "时内导航",
"室类导航", "类导航",
"退出导航", "关闭导航", "别导了", "别念了", "停止", "导航"] # Day 28: 增强停止命令识别 + 单独"导航"
is_nav_control = any(keyword in user_text for keyword in nav_control_keywords) is_nav_control = any(keyword in user_text for keyword in nav_control_keywords)
# 如果既不是允许的查询,也不是导航控制命令,则丢弃 # 如果既不是允许的查询,也不是导航控制命令,则丢弃
@@ -843,7 +856,8 @@ async def start_ai_with_text_custom(user_text: str):
return return
# 【修改】检查是否是导航相关命令 - 使用orchestrator控制 # 【修改】检查是否是导航相关命令 - 使用orchestrator控制
if "开始导航" in user_text or "盲道导航" in user_text or "帮我导航" in user_text: # Day 28: 支持单独说"导航"作为盲道导航启动命令(防止因 AS R吞字变成聊天
if "开始导航" in user_text or "盲道导航" in user_text or "帮我导航" in user_text or user_text.strip() == "导航":
# 【新增】如果正在找物品,先停止 # 【新增】如果正在找物品,先停止
if ctx.yolomedia_running: if ctx.yolomedia_running:
stop_yolomedia() stop_yolomedia()
@@ -858,8 +872,11 @@ async def start_ai_with_text_custom(user_text: str):
await ui_broadcast_final("[系统] 导航系统未就绪") await ui_broadcast_final("[系统] 导航系统未就绪")
return return
# 【新增】检查是否是室内导航命令 # 【新增】检查是否是室内导航命令包含ASR误识别别名
if "室内导航" in user_text or "室内导盲" in user_text: # Day 28: 添加更多同音误识别别名
indoor_nav_aliases = ["室内导航", "室内导盲", "四内导航", "思维导航", "失内导航", "时内导航",
"室类导航", "类导航"] # Day 28: 新增误识别
if any(alias in user_text for alias in indoor_nav_aliases):
# 如果正在找物品,先停止 # 如果正在找物品,先停止
if ctx.yolomedia_running: if ctx.yolomedia_running:
stop_yolomedia() stop_yolomedia()
@@ -876,7 +893,8 @@ async def start_ai_with_text_custom(user_text: str):
# 【修改】停止导航优先判断 # 【修改】停止导航优先判断
# 只要包含"停止导航"或"结束导航",无论是否包含"室内",都视为停止指令 # 只要包含"停止导航"或"结束导航",无论是否包含"室内",都视为停止指令
if "停止导航" in user_text or "结束导航" in user_text: stop_keywords = ["停止导航", "结束导航", "退出导航", "关闭导航", "别导了", "别念了", "停止"]
if any(k in user_text for k in stop_keywords):
if ctx.orchestrator: if ctx.orchestrator:
ctx.orchestrator.stop_navigation() ctx.orchestrator.stop_navigation()
print(f"[NAVIGATION] 导航已停止,状态: {ctx.orchestrator.get_state()}") print(f"[NAVIGATION] 导航已停止,状态: {ctx.orchestrator.get_state()}")
@@ -1060,8 +1078,15 @@ async def start_ai_with_text(user_text: str):
from audio_stream import stream_clients from audio_stream import stream_clients
for sc in list(stream_clients): for sc in list(stream_clients):
if not sc.abort_event.is_set(): if not sc.abort_event.is_set():
try: sc.q.put_nowait(b"\x00"*BYTES_PER_20MS_16K) # Day 28: 添加少量静音填充防止结尾爆音 (Pop noise fix)
except Exception: pass # 增加到 10 帧 (200ms) 以确保完全淡出
try:
silence_frame = b'\x00' * 640 # 20ms silence (16k * 2 bytes * 0.02)
for _ in range(10): # 200ms silence
sc.q.put_nowait(silence_frame)
except Exception:
pass
try: sc.q.put_nowait(None) try: sc.q.put_nowait(None)
except Exception: pass except Exception: pass
@@ -1128,8 +1153,9 @@ async def start_ai_with_text(user_text: str):
from audio_stream import stream_clients from audio_stream import stream_clients
for sc in list(stream_clients): for sc in list(stream_clients):
if not sc.abort_event.is_set(): if not sc.abort_event.is_set():
try: sc.q.put_nowait(b"\x00"*BYTES_PER_20MS_16K) # Day 28: 移除静音填充包以消除杂音
except Exception: pass # try: sc.q.put_nowait(b"\x00"*BYTES_PER_20MS_16K)
# except Exception: pass
try: sc.q.put_nowait(None) try: sc.q.put_nowait(None)
except Exception: pass except Exception: pass

View File

@@ -64,7 +64,9 @@ NAV_CONTROL_WHITELIST = [
"停止导航", "结束导航", "停止检测", "停止红绿灯", "停止导航", "结束导航", "停止检测", "停止红绿灯",
"开始导航", "盲道导航", "开始过马路", "过马路结束", "开始导航", "盲道导航", "开始过马路", "过马路结束",
"帮我导航", "帮我过马路", "帮我导航", "帮我过马路",
"室内导航", "室内导盲", # Day 25: 新增室内导航命令 "室内导航", "室内导盲", "四内导航", "思维导航", "失内导航", "时内导航", # Day 28: 室内导航 + 同音误识别
"室类导航", "类导航", # Day 28: 新增误识别
"退出导航", "关闭导航", "别导了", "别念了", "停止", # Day 28: 增强停止命令
] ]

View File

@@ -225,6 +225,14 @@ async def _broadcast_audio_optimized(pcm_data: bytes):
# 注意:录制在 broadcast_pcm16_realtime 中统一完成,避免重复 # 注意:录制在 broadcast_pcm16_realtime 中统一完成,避免重复
# Day 28: 播放期间全局暂停 VAD防止系统听到自己的声音
# 这对于没有回声消除(AEC)的系统至关重要,否则导航提示语音会触发 VAD
# 导致 VAD 误判为用户说话,从而一直占用识别通道
from server_vad import get_server_vad
vad = get_server_vad()
if vad:
vad.set_tts_playing(True)
# 单次调用交给底层 pacing20ms节拍在 broadcast_pcm16_realtime 内部实现) # 单次调用交给底层 pacing20ms节拍在 broadcast_pcm16_realtime 内部实现)
await broadcast_pcm16_realtime(full_audio) await broadcast_pcm16_realtime(full_audio)
@@ -232,6 +240,12 @@ async def _broadcast_audio_optimized(pcm_data: bytes):
except Exception as e: except Exception as e:
print(f"[AUDIO] 广播音频失败: {e}") print(f"[AUDIO] 广播音频失败: {e}")
finally: finally:
# 恢复 VAD 检测
from server_vad import get_server_vad
vad = get_server_vad()
if vad:
vad.set_tts_playing(False)
# 清除播放标志 # 清除播放标志
with _playing_lock: with _playing_lock:
_is_playing = False _is_playing = False

View File

@@ -102,6 +102,19 @@ async def hard_reset_audio(reason: str = ""):
# 2) 取消当前AI任务 # 2) 取消当前AI任务
await cancel_current_ai() await cancel_current_ai()
# Day 28: 强制重置 VAD TTS 状态防止因任务取消导致计数器未归零VAD 冻结)
try:
# Safe import to avoid circular dependency
import sys
if 'server_vad' in sys.modules:
server_vad = sys.modules['server_vad']
if hasattr(server_vad, 'get_server_vad'):
vad = server_vad.get_server_vad()
if vad:
vad.reset_tts_state()
except Exception as e:
print(f"[HARD-RESET] 重置 VAD 状态失败: {e}")
# 3) 日志 # 3) 日志
if reason: if reason:
print(f"[HARD-RESET] {reason}") print(f"[HARD-RESET] {reason}")

View File

@@ -293,6 +293,38 @@ class NavigationMaster:
def get_state(self) -> str: def get_state(self) -> str:
return self.state return self.state
# Day 28: 室内导航可视化绘制
def _draw_indoor_visualizations(self, image: np.ndarray, visualizations: list):
if not visualizations:
return
for viz in visualizations:
v_type = viz.get('type')
if v_type == 'walkable_mask':
mask = viz.get('mask')
color_str = viz.get('color', 'rgba(0, 255, 0, 0.3)')
# 这里简单处理,只画绿色轮廓和半透明填充
if mask is not None:
# 1. 绿色覆盖
green_mask = np.zeros_like(image)
green_mask[mask > 0] = [0, 255, 0] # BGR
image[:] = cv2.addWeighted(image, 1.0, green_mask, 0.3, 0)
# 2. 轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(image, contours, -1, (0, 255, 0), 2)
elif v_type in ('obstacle', 'poi', 'person'):
center = viz.get('center')
label = viz.get('class_name_cn', '?')
if center:
cx, cy = center
color = (0, 0, 255) if v_type == 'obstacle' else (255, 255, 0)
cv2.circle(image, (cx, cy), 5, color, -1)
cv2.putText(image, label, (cx + 10, cy), cv2.FONT_HERSHEY_SIMPLEX,
0.6, color, 2, cv2.LINE_AA)
def start_blind_path_navigation(self): def start_blind_path_navigation(self):
"""启动盲道导航模式""" """启动盲道导航模式"""
self.state = BLINDPATH_NAV self.state = BLINDPATH_NAV
@@ -330,8 +362,9 @@ class NavigationMaster:
"""启动室内导航模式(使用室内导盲模型)""" """启动室内导航模式(使用室内导盲模型)"""
self.state = INDOOR_NAV self.state = INDOOR_NAV
self.cooldown_until = time.time() + self.COOLDOWN_SEC self.cooldown_until = time.time() + self.COOLDOWN_SEC
if self.blind: # Day 28: 应该重置室内导航器,而不是盲道导航器
self.blind.reset() if self.indoor:
self.indoor.reset()
def is_in_navigation_mode(self): def is_in_navigation_mode(self):
"""检查是否在导航模式(非对话模式)""" """检查是否在导航模式(非对话模式)"""
@@ -481,18 +514,28 @@ class NavigationMaster:
if self.state == INDOOR_NAV: if self.state == INDOOR_NAV:
# 优先使用室内导航器,如果没有则 fallback 到盲道导航器 # 优先使用室内导航器,如果没有则 fallback 到盲道导航器
nav = self.indoor if self.indoor else self.blind nav = self.indoor if self.indoor else self.blind
# Day 28: 添加警告日志
if self.indoor is None:
print("[NAV MASTER] 警告: 室内导航器未初始化fallback 到盲道导航器!")
try: try:
result = nav.process_frame(bgr) result = nav.process_frame(bgr)
except Exception as e: except Exception as e:
self.state = RECOVERY # Day 28: 室内导航出错时,保持在室内模式,不要切到 RECOVERY (会导致自动切回盲道)
print(f"[INDOOR ERROR] 室内导航异常: {e}")
# self.state = RECOVERY <-- 禁止切换!
ann_err = bgr.copy() ann_err = bgr.copy()
return OrchestratorResult(ann_err, self._say(now, ""), self.state, {"error": str(e)}) return OrchestratorResult(ann_err, self._say(now, ""), INDOOR_NAV, {"error": str(e)})
ann = result.annotated_image if result.annotated_image is not None else bgr.copy() ann = result.annotated_image if result.annotated_image is not None else bgr.copy()
say = result.guidance_text or "" say = result.guidance_text or ""
state_info = result.state_info if hasattr(result, 'state_info') else {} state_info = result.state_info if hasattr(result, 'state_info') else {}
return OrchestratorResult(ann, self._say(now, say), self.state, # Day 28: 绘制室内导航可视化
visualizations = result.visualizations if hasattr(result, 'visualizations') else []
self._draw_indoor_visualizations(ann, visualizations)
# Day 28: 确保返回正确的状态 INDOOR_NAV
return OrchestratorResult(ann, self._say(now, say), INDOOR_NAV,
{"source": "indoor", "state_info": state_info}) {"source": "indoor", "state_info": state_info})
# 各状态处理 # 各状态处理

View File

@@ -96,7 +96,8 @@ class SileroVAD:
self.speech_audio = bytearray() # 存储语音音频 self.speech_audio = bytearray() # 存储语音音频
# TTS 播放状态 - 播放期间暂停 VAD # TTS 播放状态 - 播放期间暂停 VAD
self.tts_playing = False # Day 28: 使用引用计数处理并发播放的情况
self.tts_playing_count = 0
self.tts_end_time = 0 # TTS 结束时间 self.tts_end_time = 0 # TTS 结束时间
self.tts_cooldown_ms = 500 # TTS 结束后等待 500ms 再开始检测 self.tts_cooldown_ms = 500 # TTS 结束后等待 500ms 再开始检测
@@ -105,9 +106,9 @@ class SileroVAD:
self.window_size = 5 # 滑动窗口大小 self.window_size = 5 # 滑动窗口大小
self.frame_threshold = 3 # 至少多少帧语音才算开始说话 self.frame_threshold = 3 # 至少多少帧语音才算开始说话
# Day 23: Pre-speech buffer (Lookback) to fix "cut-off" start of words # Day 23+28: Pre-speech buffer (Lookback) to fix "cut-off" start of words
# 300ms lookback approx. (each chunk is 32ms) -> 10 chunks # Day 28: 增加到 768ms (24 chunks) 以捕获 "室内导航" 等较长开头,防止 ASR 吞字
self.pre_speech_buffer = collections.deque(maxlen=10) self.pre_speech_buffer = collections.deque(maxlen=24)
print(f"[VAD] 初始化: threshold={threshold}, threshold_low={threshold_low}, " print(f"[VAD] 初始化: threshold={threshold}, threshold_low={threshold_low}, "
f"min_silence_ms={min_silence_ms}, min_speech_ms={min_speech_ms}") f"min_silence_ms={min_silence_ms}, min_speech_ms={min_speech_ms}")
@@ -120,29 +121,46 @@ class SileroVAD:
self.last_speech_time = 0 self.last_speech_time = 0
self.speech_start_time = 0 self.speech_start_time = 0
self.voice_window.clear() self.voice_window.clear()
self.tts_playing = False self.tts_playing_count = 0
self.tts_end_time = 0 self.tts_end_time = 0
if self.model: if self.model:
self.model.reset_states() self.model.reset_states()
if hasattr(self, 'pre_speech_buffer'):
self.pre_speech_buffer.clear()
def reset_tts_state(self):
"""强制重置 TTS 播放状态 (用于硬重置)"""
self.tts_playing_count = 0
print("[VAD] 强制重置 TTS 状态 (VAD 恢复)")
def set_tts_playing(self, playing: bool): def set_tts_playing(self, playing: bool):
"""设置 TTS 播放状态""" """设置 TTS 播放状态 (引用计数)"""
self.tts_playing = playing if playing:
if not playing: self.tts_playing_count += 1
# TTS 结束,记录时间 if self.tts_playing_count == 1:
self.tts_end_time = time.time() * 1000 print("[VAD] TTS 开始播放,暂停 VAD 检测")
print("[VAD] TTS 结束,等待冷却期...") # TTS 开始播放时,如果正在录音则中断
if self.is_speaking:
self.is_speaking = False
self.speech_audio.clear()
self.voice_window.clear()
# Day 23: Clear lookback buffer
if hasattr(self, 'pre_speech_buffer'):
self.pre_speech_buffer.clear()
# Day 28: 重置模型状态
if self.model:
self.model.reset_states()
print("[VAD] TTS 播放打断语音录制")
else: else:
print("[VAD] TTS 开始播放,暂停 VAD 检测") if self.tts_playing_count > 0:
# TTS 开始播放时,如果正在录音则中断 self.tts_playing_count -= 1
if self.is_speaking: if self.tts_playing_count == 0:
self.is_speaking = False # TTS 结束,记录时间
self.speech_audio.clear() self.tts_end_time = time.time() * 1000
self.voice_window.clear() print("[VAD] TTS 完全结束,等待冷却期...")
# Day 23: Clear lookback buffer else:
if hasattr(self, 'pre_speech_buffer'): # 已经是0了忽略
self.pre_speech_buffer.clear() pass
print("[VAD] TTS 播放打断语音录制")
def process(self, audio_bytes: bytes) -> dict: def process(self, audio_bytes: bytes) -> dict:
""" """
@@ -172,7 +190,7 @@ class SileroVAD:
# TTS 播放期间,跳过 VAD 检测 # TTS 播放期间,跳过 VAD 检测
current_time = time.time() * 1000 current_time = time.time() * 1000
if self.tts_playing: if self.tts_playing_count > 0:
return result return result
# TTS 刚结束,等待冷却期 # TTS 刚结束,等待冷却期

View File

@@ -22,35 +22,39 @@ from collections import deque
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ========== 类别常量 ========== # ========== 类别常量 (14类模型 - yolo11l-seg-indoor14) ==========
# 可行走区域 # Day 28: 使用 14 类模型 (MIT Indoor Subset)
# 可行走区域 (0-2)
WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk
CLASS_FLOOR = 0 CLASS_FLOOR = 0
CLASS_CORRIDOR = 1 CLASS_CORRIDOR = 1
CLASS_SIDEWALK = 2 CLASS_SIDEWALK = 2
# 静态障碍物 # 静态障碍物 (3-5, 11-12)
OBSTACLE_CLASSES = {3, 4, 5, 11, 12} # chair, table, sofa_bed, cabinet, trash_can OBSTACLE_CLASSES = {3, 4, 5, 11, 12, 13} # window 只要是障碍物也算? window(13)是墙?
# Wait, Window is 13. Is window an obstacle? Usually yes (don't walk into it).
# Cabinet 11, Trash 12.
CLASS_CHAIR = 3 CLASS_CHAIR = 3
CLASS_TABLE = 4 CLASS_TABLE = 4
CLASS_SOFA_BED = 5 CLASS_SOFA_BED = 5
CLASS_CABINET = 11 CLASS_CABINET = 11
CLASS_TRASH_CAN = 12 CLASS_TRASH_CAN = 12
CLASS_WINDOW = 13 # 窗户通常视为边界或障碍
CLASS_WALL = 9 # Wall 9
# 兴趣点 # 兴趣点 (6-8)
POI_CLASSES = {6, 7, 8} # door, elevator, stairs POI_CLASSES = {6, 7, 8} # door, elevator, stairs
CLASS_DOOR = 6 CLASS_DOOR = 6
CLASS_ELEVATOR = 7 CLASS_ELEVATOR = 7
CLASS_STAIRS = 8 CLASS_STAIRS = 8
# 边界 # 动态障碍 (10)
BOUNDARY_CLASSES = {9, 13} # wall, window
CLASS_WALL = 9
CLASS_WINDOW = 13
# 动态障碍
CLASS_PERSON = 10 CLASS_PERSON = 10
# 边界
BOUNDARY_CLASSES = {9, 13} # wall(9), window(13)
# 类别名称映射 # 类别名称映射
CLASS_NAMES = { CLASS_NAMES = {
0: 'floor', 1: 'corridor', 2: 'sidewalk', 0: 'floor', 1: 'corridor', 2: 'sidewalk',
@@ -69,16 +73,23 @@ CLASS_NAMES_CN = {
12: '垃圾桶', 13: '窗户' 12: '垃圾桶', 13: '窗户'
} }
# 物品类 (无)
ITEM_CLASSES = set()
# ========== 配置参数 ========== # ========== 配置参数 ==========
CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.25')) # Day 28: 进一步降低阈值以提升木地板检测率
WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '3000')) # Day 28: 进一步降低阈值以提升木地板检测率
OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '500')) CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.05')) # 全局极低阈值,由后续逻辑二次过滤
WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '50')) # 极端降低最小面积以进行调试 (原 1000)
OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '300'))
# 语音间隔 # 语音间隔
GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0')) GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0'))
DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5')) DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5'))
POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0')) POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0'))
OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0')) OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0'))
# Day 28: “未检测到可行走区域”播报间隔8秒
NO_WALKABLE_INTERVAL = float(os.getenv('INDOOR_NO_WALKABLE_INTERVAL', '8.0'))
# ========== 可视化颜色 (BGR) ========== # ========== 可视化颜色 (BGR) ==========
VIS_COLORS = { VIS_COLORS = {
@@ -113,6 +124,10 @@ class IndoorNavigator:
self.seg_model = seg_model self.seg_model = seg_model
self.device_id = device_id self.device_id = device_id
self.frame_counter = 0 self.frame_counter = 0
# Day 28: 持久化缓冲参数
self.no_walkable_persistence_sec = 2.0
self.last_walkable_detected_time = 0
# 语音节流 # 语音节流
self.last_guide_time = 0 self.last_guide_time = 0
@@ -128,11 +143,14 @@ class IndoorNavigator:
# 缓存 # 缓存
self.last_walkable_mask = None self.last_walkable_mask = None
self.last_valid_walkable_mask = None
self.last_no_walkable_time = 0
self.last_obstacles = []
self.last_obstacles = [] self.last_obstacles = []
self.last_pois = [] self.last_pois = []
# 灰度图(用于光流等) # Day 28: 移除未使用的灰度图转换 (光流功能未启用)
self.prev_gray = None # self.prev_gray = None
# 日志间隔 # 日志间隔
self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30')) self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30'))
@@ -150,6 +168,9 @@ class IndoorNavigator:
self.last_obstacle_time = 0 self.last_obstacle_time = 0
self.last_guidance_text = "" self.last_guidance_text = ""
self.last_direction_text = "" self.last_direction_text = ""
self.last_valid_walkable_mask = None
self.last_no_walkable_time = 0 # Day 28: "未检测到可行走区域"节流
self.last_walkable_detected_time = 0
self.last_walkable_mask = None self.last_walkable_mask = None
self.last_obstacles = [] self.last_obstacles = []
self.last_pois = [] self.last_pois = []
@@ -185,13 +206,27 @@ class IndoorNavigator:
obstacles = self.last_obstacles obstacles = self.last_obstacles
pois = self.last_pois pois = self.last_pois
# 生成导航引导
# 3. 缓存有效的 mask (用于可视化防抖)
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
if walkable_area > WALKABLE_MIN_AREA:
self.last_valid_walkable_mask = walkable_mask
# 4. 生成导航引导
if walkable_mask is not None: if walkable_mask is not None:
guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now) guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now)
# 添加可视化 # 5. 可视化 (带持久化防抖)
self._add_mask_visualization(walkable_mask, frame_visualizations, viz_mask = walkable_mask
"walkable_mask", "rgba(0, 255, 0, 0.3)")
# 如果当前没有检测到路,但还在持久化时间内,使用缓存的 mask 进行可视化
if (viz_mask is None or walkable_area < WALKABLE_MIN_AREA) and \
(now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec and \
self.last_valid_walkable_mask is not None:
viz_mask = self.last_valid_walkable_mask
self._add_mask_visualization(viz_mask, frame_visualizations,
"walkable_mask", "rgba(0, 255, 0, 0.3)")
# 障碍物可视化 # 障碍物可视化
for obs in obstacles: for obs in obstacles:
@@ -203,7 +238,8 @@ class IndoorNavigator:
# 日志 # 日志
if self.frame_counter % self.log_interval == 0: if self.frame_counter % self.log_interval == 0:
walkable_area = int(walkable_mask.sum()) if walkable_mask is not None else 0 # Day 28: 修复面积计算 - 使用 count_nonzero 而不是 sum (mask 值是 0 或 255)
walkable_area = int(np.count_nonzero(walkable_mask)) if walkable_mask is not None else 0
logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | " logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | "
f"障碍物={len(obstacles)} | 兴趣点={len(pois)}") f"障碍物={len(obstacles)} | 兴趣点={len(pois)}")
@@ -215,11 +251,12 @@ class IndoorNavigator:
'pois_count': len(pois), 'pois_count': len(pois),
} }
# 更新灰度图 # Day 28: 移除未使用的灰度图转换
self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Day 28: 避免每帧复制图像,直接传递原图像(下游如需可视化再复制)
return IndoorResult( return IndoorResult(
annotated_image=image.copy(), annotated_image=image, # 不再 copy节省内存/CPU
guidance_text=guidance_text, guidance_text=guidance_text,
state_info=state_info, state_info=state_info,
visualizations=frame_visualizations visualizations=frame_visualizations
@@ -253,17 +290,41 @@ class IndoorNavigator:
cls_id = int(cls_id.item()) cls_id = int(cls_id.item())
conf_val = float(conf.item()) conf_val = float(conf.item())
# 过滤物品类 (默认不参与导航逻辑,避免刷屏)
if cls_id in ITEM_CLASSES:
continue
# Day 28: 混合阈值策略
# 地面类(WALKABLE)使用全局低阈值(0.05)以提高召回率
# 障碍物(OBSTACLE/POI/BOUNDARY)使用较高阈值(0.25)以拒绝误报
filter_threshold = 0.25
if cls_id in WALKABLE_CLASSES:
filter_threshold = 0.05
if conf_val < filter_threshold:
continue
# 调整 mask 尺寸 # 调整 mask 尺寸
mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST) mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
mask_bin = (mask_resized > 0.5).astype(np.uint8) mask_bin = (mask_resized > 0.5).astype(np.uint8)
area = int(mask_bin.sum()) area = int(mask_bin.sum())
if area < 100: # 过滤小碎片
# Day 28: 调试日志 - 查看检测到的类别 (ALL detections)
if area > 10: # 几乎记录所有检测
cls_name = CLASS_NAMES.get(cls_id, f'unknown_{cls_id}')
logger.info(f"[INDOOR DEBUG] 检测到 {cls_name}(id={cls_id}) conf={conf_val:.2f} area={area}")
if area < 50: # 极端小的才过滤
continue continue
# 可行走区域 # 可行走区域
if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA: if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA:
walkable_mask = cv2.bitwise_or(walkable_mask, mask_bin * 255) # Day 28: 确保类型一致,避免 bitwise_or 失败
mask_add = (mask_bin * 255).astype(np.uint8)
walkable_mask = cv2.bitwise_or(walkable_mask, mask_add)
if area > 10000: # 调试:记录大面积添加
logger.info(f"[INDOOR DEBUG] 添加可行走区域: class={cls_id} area={area} current_total={np.count_nonzero(walkable_mask)}")
# 障碍物 # 障碍物
elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON: elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON:
@@ -323,8 +384,15 @@ class IndoorNavigator:
self.last_obstacle_time = now self.last_obstacle_time = now
self.last_guidance_text = guidance_text self.last_guidance_text = guidance_text
elif direction_guidance: elif direction_guidance:
# Day 28: "未检测到可行走区域" 降低播报频率
# Day 28: "未检测到可行走区域" 降低播报频率
if direction_guidance == "未检测到可行走区域":
# 首次检测到last_no_walkable_time == 0或者间隔已过8秒
if self.last_no_walkable_time == 0 or (now - self.last_no_walkable_time) > NO_WALKABLE_INTERVAL:
guidance_text = direction_guidance
self.last_no_walkable_time = now
# 方向引导节流 # 方向引导节流
if direction_guidance != self.last_direction_text: elif direction_guidance != self.last_direction_text:
if (now - self.last_direction_time) > DIRECTION_INTERVAL: if (now - self.last_direction_time) > DIRECTION_INTERVAL:
guidance_text = direction_guidance guidance_text = direction_guidance
self.last_direction_time = now self.last_direction_time = now
@@ -341,13 +409,25 @@ class IndoorNavigator:
def _compute_direction_guidance(self, walkable_mask, h, w): def _compute_direction_guidance(self, walkable_mask, h, w):
"""计算方向引导""" """计算方向引导"""
if walkable_mask is None or walkable_mask.sum() < WALKABLE_MIN_AREA: # Day 28: 使用 count_nonzero 替代 sum (mask 值是 0 或 255)
walkable_area = np.count_nonzero(walkable_mask) if walkable_mask is not None else 0
now = time.time()
if walkable_area < WALKABLE_MIN_AREA:
# 缓冲逻辑:如果最近才看到过路,不要立刻报错
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
return None # 保持沉默,或者返回 "保持直行" (更稳妥是沉默)
return "未检测到可行走区域" return "未检测到可行走区域"
# 如果检测到了,更新时间戳
self.last_walkable_detected_time = now
# 分析下半部分(更近的区域) # 分析下半部分(更近的区域)
lower_half = walkable_mask[int(h * 0.5):, :] lower_half = walkable_mask[int(h * 0.5):, :]
if lower_half.sum() < 1000: if np.count_nonzero(lower_half) < 1000:
if (now - self.last_walkable_detected_time) < self.no_walkable_persistence_sec:
return None
return "前方可行走区域较小,请小心" return "前方可行走区域较小,请小心"
# 计算左中右分布 # 计算左中右分布

View File

@@ -15,7 +15,9 @@ except Exception:
from ultralytics import YOLO as _MODEL from ultralytics import YOLO as _MODEL
# Day 20: 优先使用 TensorRT 引擎 # Day 20: 优先使用 TensorRT 引擎
DEFAULT_MODEL_PATH = get_best_model_path(os.getenv("YOLOE_MODEL_PATH", "model/yoloe-11l-seg.pt")) # Day 28: 使用基于当前文件的绝对路径
_DEFAULT_YOLOE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model", "yoloe-11l-seg.pt")
DEFAULT_MODEL_PATH = get_best_model_path(os.getenv("YOLOE_MODEL_PATH", _DEFAULT_YOLOE_PATH))
TRACKER_CFG = os.getenv("YOLO_TRACKER_YAML", "bytetrack.yaml") TRACKER_CFG = os.getenv("YOLO_TRACKER_YAML", "bytetrack.yaml")
class YoloEBackend: class YoloEBackend: