diff --git a/app_main.py b/app_main.py index 8ebbbb3..63656e0 100644 --- a/app_main.py +++ b/app_main.py @@ -1,6 +1,17 @@ # app_main.py # -*- coding: utf-8 -*- -import os, sys, time, json, asyncio, base64, audioop + +# Day 26: 抑制 TensorRT 冗余日志(必须在任何 TensorRT 相关 import 之前) +import os +os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" # 抑制 TensorFlow 警告 +try: + import tensorrt as trt + # 设置 TensorRT 日志级别为 WARNING(抑制 INFO 级别的重复加载日志) + trt.Logger.min_severity = trt.Logger.WARNING +except ImportError: + pass # TensorRT 未安装,跳过 + +import sys, time, json, asyncio, base64, audioop from typing import Any, Dict, Optional, Tuple, List, Callable, Set, Deque from collections import deque from dataclasses import dataclass @@ -26,6 +37,45 @@ from starlette.websockets import WebSocketState import uvicorn import cv2 import numpy as np +from contextlib import asynccontextmanager + +# Server Optimization: Global Context & Logging +from server_context import ctx +import logging +from logging.handlers import TimedRotatingFileHandler + +# Configure Logging +# Ensure logs directory exists +os.makedirs("logs", exist_ok=True) + +log_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') +log_handler = TimedRotatingFileHandler("logs/naviglass.log", when="midnight", interval=1, backupCount=7) +log_handler.setFormatter(log_formatter) +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.addHandler(log_handler) +# Add console handler +console_handler = logging.StreamHandler() +console_handler.setFormatter(log_formatter) +logger.addHandler(console_handler) + +print = logger.info # Redirect print to logger (simple compatibility hack) + +# Server Optimization: Dynamic Import Optimization (Fail Fast) +try: + import trafficlight_detection +except ImportError as e: + logger.warning(f"[INIT] 警告: trafficlight_detection 导入失败: {e}") + trafficlight_detection = None + + # try: + # from hand_landmarker import HandLandmarker + # hand_detector = HandLandmarker() + # print("[OLD_AI] HandLandmarker loaded.") + # except ImportError as e: + # # logger.warning(f"[INIT] 警告: hand_landmarker 导入失败: {e}") + # hand_detector = None + hand_detector = None # 【Day 19 优化】TurboJPEG - 比 cv2.imencode/imdecode 快 2-3 倍 # Day 20: TensorRT 模型加载工具 @@ -132,23 +182,23 @@ async def lifespan(app: FastAPI): if main_loop.is_closed(): return - global yolomedia_sending_frames - if not yolomedia_sending_frames: - yolomedia_sending_frames = True - print("[YOLOMEDIA] 开始发送处理后的帧,切换到YOLO画面", flush=True) + # global yolomedia_sending_frames (Moved to ctx) + if not ctx.yolomedia_sending_frames: + ctx.yolomedia_sending_frames = True + logger.info("[YOLOMEDIA] 开始发送处理后的帧,切换到YOLO画面") async def _broadcast(): - if not camera_viewers: + if not ctx.camera_viewers: return dead = [] - for ws in list(camera_viewers): + for ws in list(ctx.camera_viewers): try: await ws.send_bytes(jpeg_bytes) except Exception: dead.append(ws) for ws in dead: try: - camera_viewers.remove(ws) + ctx.camera_viewers.remove(ws) except Exception: pass @@ -218,8 +268,8 @@ async def lifespan(app: FastAPI): # 【Day 15】关闭帧处理线程池 try: - frame_processing_executor.shutdown(wait=False) - print("[LIFESPAN] 帧处理线程池已关闭") + ctx.frame_processing_executor.shutdown(wait=False) + logger.info("[LIFESPAN] 帧处理线程池已关闭") except Exception: pass @@ -241,46 +291,11 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) # ====== 状态与容器 ====== +# 全局变量已移至 server_context.py (ctx 单例) +# 挂载静态文件 app.mount("/static", StaticFiles(directory="static"), name="static") -ui_clients: Dict[int, WebSocket] = {} -current_partial: str = "" -recent_finals: List[str] = [] -RECENT_MAX = 50 -last_frames: Deque[Tuple[float, bytes]] = deque(maxlen=10) - -camera_viewers: Set[WebSocket] = set() -esp32_camera_ws: Optional[WebSocket] = None -imu_ws_clients: Set[WebSocket] = set() -esp32_audio_ws: Optional[WebSocket] = None - -# 【新增】盲道导航相关全局变量 -blind_path_navigator = None -navigation_active = False -yolo_seg_model = None -obstacle_detector = None - -# 【新增】过马路导航相关全局变量 -cross_street_navigator = None -cross_street_active = False -orchestrator = None # 新增 - -# 【新增】omni对话状态标志 -omni_conversation_active = False # 标记omni对话是否正在进行 -omni_previous_nav_state = None # 保存omni激活前的导航状态,用于恢复 - -# 【Day 15 性能优化】帧处理线程池 - Day 18 优化: 增加worker数量 -# 将 CPU 密集型的帧处理移至后台线程,避免阻塞事件循环 -frame_processing_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="frame_proc") - -# 【Day 15 跳帧机制】异步帧处理状态 -# 避免 await 阻塞,使用后台任务 + 最新结果缓存 -_nav_processing_task = None # 当前的后台处理任务 -_nav_last_result_image = None # 最后一次成功处理的输出图像 -_nav_last_result_jpeg: bytes = None # 【Day 19 优化】缓存编码后的 JPEG,避免重复编码 -_nav_pending_frame = None # 等待处理的最新帧 -_nav_processing_lock = asyncio.Lock() # 确保单任务运行 -_nav_task_start_time = None # Day 20: 任务开始时间,用于计算处理耗时 +# 【Day 19 优化】TurboJPEG 辅助函数 - 带回退逻辑 # 【Day 18 性能优化】并行广播辅助函数 - 解决 WebSocket 顺序发送阻塞 @@ -303,10 +318,10 @@ def turbo_encode(bgr_image, quality: int = 80) -> bytes: async def _broadcast_to_viewers(jpeg_data: bytes) -> None: """并行向所有 viewer 广播 JPEG 帧,避免顺序 await 阻塞事件循环""" - if not camera_viewers or not jpeg_data: + if not ctx.camera_viewers or not jpeg_data: return - viewers = list(camera_viewers) + viewers = list(ctx.camera_viewers) if not viewers: return @@ -322,16 +337,16 @@ async def _broadcast_to_viewers(jpeg_data: bytes) -> None: # 清理失败的连接 for r in results: - if r is not None and r in camera_viewers: + if r is not None and r in ctx.camera_viewers: try: - camera_viewers.discard(r) + ctx.camera_viewers.discard(r) except Exception: pass def load_navigation_models(): """加载盲道导航所需的模型""" - global yolo_seg_model, obstacle_detector + # global yolo_seg_model, obstacle_detector (Moved to ctx) try: seg_model_path = os.getenv("BLIND_PATH_MODEL", "model/yolo-seg.pt") @@ -341,21 +356,21 @@ def load_navigation_models(): if os.path.exists(seg_model_path): print(f"[NAVIGATION] 模型文件存在,开始加载...") - yolo_seg_model = YOLO(seg_model_path) + ctx.yolo_seg_model = YOLO(seg_model_path) # Day 20: TensorRT 引擎不需要 .to() 和 .fuse() from model_utils import is_tensorrt_engine if is_tensorrt_engine(seg_model_path): print(f"[NAVIGATION] TensorRT 引擎已加载,跳过 .to() 和 .fuse()") elif torch.cuda.is_available(): - yolo_seg_model.to("cuda") + ctx.yolo_seg_model.to("cuda") # Day 22 优化: 融合模型层以提升推理速度 try: - yolo_seg_model.fuse() + ctx.yolo_seg_model.fuse() print(f"[NAVIGATION] 模型层融合完成") except Exception as e: print(f"[NAVIGATION] 模型融合失败(非致命): {e}") - print(f"[NAVIGATION] 盲道分割模型加载成功并放到GPU: {yolo_seg_model.device}") + print(f"[NAVIGATION] 盲道分割模型加载成功并放到GPU: {ctx.yolo_seg_model.device}") else: print("[NAVIGATION] CUDA不可用,模型仍在CPU") @@ -367,7 +382,7 @@ def load_navigation_models(): # 预热推理,让CUDA编译kernel for _ in range(3): # 多次预热确保稳定 - results = yolo_seg_model.predict( + results = ctx.yolo_seg_model.predict( test_img, device="cuda" if torch.cuda.is_available() else "cpu", verbose=False, @@ -375,9 +390,9 @@ def load_navigation_models(): half=use_half ) print(f"[NAVIGATION] 模型预热成功 (imgsz={imgsz}, half={use_half})") - print(f"[NAVIGATION] 支持的类别数: {len(yolo_seg_model.names) if hasattr(yolo_seg_model, 'names') else '未知'}") - if hasattr(yolo_seg_model, 'names'): - print(f"[NAVIGATION] 模型类别: {yolo_seg_model.names}") + print(f"[NAVIGATION] 支持的类别数: {len(ctx.yolo_seg_model.names) if hasattr(ctx.yolo_seg_model, 'names') else '未知'}") + if hasattr(ctx.yolo_seg_model, 'names'): + print(f"[NAVIGATION] 模型类别: {ctx.yolo_seg_model.names}") except Exception as e: print(f"[NAVIGATION] 模型预热失败: {e}") else: @@ -395,32 +410,32 @@ def load_navigation_models(): print(f"[NAVIGATION] 障碍物检测模型文件存在,开始加载...") try: # 使用 ObstacleDetectorClient 封装的 YOLO-E - obstacle_detector = ObstacleDetectorClient(model_path=obstacle_model_path) + ctx.obstacle_detector = ObstacleDetectorClient(model_path=obstacle_model_path) print(f"[NAVIGATION] ========== YOLO-E 障碍物检测器加载成功 ==========") # 检查模型是否成功加载 - if hasattr(obstacle_detector, 'model') and obstacle_detector.model is not None: + if hasattr(ctx.obstacle_detector, 'model') and ctx.obstacle_detector.model is not None: print(f"[NAVIGATION] YOLO-E 模型已初始化") # Day 20: TensorRT 引擎没有 .parameters(),跳过设备检查 if not is_tensorrt_engine(obstacle_model_path): try: - print(f"[NAVIGATION] 模型设备: {next(obstacle_detector.model.parameters()).device}") + print(f"[NAVIGATION] 模型设备: {next(ctx.obstacle_detector.model.parameters()).device}") except StopIteration: pass else: print(f"[NAVIGATION] 警告:YOLO-E 模型初始化异常") # 检查白名单是否成功加载 - if hasattr(obstacle_detector, 'WHITELIST_CLASSES'): - print(f"[NAVIGATION] 白名单类别数: {len(obstacle_detector.WHITELIST_CLASSES)}") - print(f"[NAVIGATION] 白名单前10个类别: {', '.join(obstacle_detector.WHITELIST_CLASSES[:10])}") + if hasattr(ctx.obstacle_detector, 'WHITELIST_CLASSES'): + print(f"[NAVIGATION] 白名单类别数: {len(ctx.obstacle_detector.WHITELIST_CLASSES)}") + print(f"[NAVIGATION] 白名单前10个类别: {', '.join(ctx.obstacle_detector.WHITELIST_CLASSES[:10])}") else: print(f"[NAVIGATION] 警告:白名单类别未定义") # 检查文本特征是否成功预计算 - if hasattr(obstacle_detector, 'whitelist_embeddings') and obstacle_detector.whitelist_embeddings is not None: + if hasattr(ctx.obstacle_detector, 'whitelist_embeddings') and ctx.obstacle_detector.whitelist_embeddings is not None: print(f"[NAVIGATION] YOLO-E 文本特征已预计算") - print(f"[NAVIGATION] 文本特征张量形状: {obstacle_detector.whitelist_embeddings.shape if hasattr(obstacle_detector.whitelist_embeddings, 'shape') else '未知'}") + print(f"[NAVIGATION] 文本特征张量形状: {ctx.obstacle_detector.whitelist_embeddings.shape if hasattr(ctx.obstacle_detector.whitelist_embeddings, 'shape') else '未知'}") else: print(f"[NAVIGATION] 警告:YOLO-E 文本特征未预计算") @@ -432,7 +447,7 @@ def load_navigation_models(): cv2.rectangle(test_img, (200, 200), (400, 400), (255, 255, 255), -1) # 测试检测(不提供 path_mask) - test_results = obstacle_detector.detect(test_img) + test_results = ctx.obstacle_detector.detect(test_img) print(f"[NAVIGATION] YOLO-E 检测测试成功!") print(f"[NAVIGATION] 测试检测结果数: {len(test_results)}") @@ -453,7 +468,7 @@ def load_navigation_models(): print(f"[NAVIGATION] 障碍物检测器加载失败: {e}") import traceback traceback.print_exc() - obstacle_detector = None + ctx.obstacle_detector = None else: print(f"[NAVIGATION] 警告:找不到障碍物检测模型文件: {obstacle_model_path}") @@ -462,29 +477,92 @@ def load_navigation_models(): import traceback traceback.print_exc() +def load_indoor_model(): + """Day 25: 加载室内导盲模型""" + # global indoor_seg_model (Moved to ctx) + from model_utils import is_tensorrt_engine # Imported here for usage + + try: + indoor_model_path = os.getenv("INDOOR_MODEL", "model/yolo11l-seg-indoor14.pt") + # 优先使用 TensorRT 引擎 + indoor_model_path = get_best_model_path(indoor_model_path) + print(f"[INDOOR] 尝试加载室内导盲模型: {indoor_model_path}") + + if os.path.exists(indoor_model_path): + ctx.indoor_seg_model = YOLO(indoor_model_path) + + if is_tensorrt_engine(indoor_model_path): + print(f"[INDOOR] TensorRT 引擎已加载,跳过 .to() 和 .fuse()") + elif torch.cuda.is_available(): + ctx.indoor_seg_model.to("cuda") + try: + ctx.indoor_seg_model.fuse() + print(f"[INDOOR] 模型层融合完成") + except Exception as e: + print(f"[INDOOR] 模型融合失败(非致命): {e}") + print(f"[INDOOR] 室内导盲模型加载成功并放到GPU: {ctx.indoor_seg_model.device}") + + # 预热推理 + try: + imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480")) + use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1" + test_img = np.zeros((imgsz, imgsz, 3), dtype=np.uint8) + for _ in range(2): + ctx.indoor_seg_model.predict( + test_img, + device="cuda" if torch.cuda.is_available() else "cpu", + verbose=False, + imgsz=imgsz, + half=use_half + ) + print(f"[INDOOR] 模型预热成功 (imgsz={imgsz})") + if hasattr(ctx.indoor_seg_model, 'names'): + print(f"[INDOOR] 室内模型类别数: {len(ctx.indoor_seg_model.names)}") + print(f"[INDOOR] 类别: {list(ctx.indoor_seg_model.names.values())[:5]}...") + except Exception as e: + print(f"[INDOOR] 模型预热失败: {e}") + else: + print(f"[INDOOR] 警告:找不到室内导盲模型文件: {indoor_model_path}") + except Exception as e: + print(f"[INDOOR] 室内模型加载失败: {e}") + import traceback + traceback.print_exc() + # 在程序启动时加载模型 print("[NAVIGATION] 开始加载导航模型...") load_navigation_models() -print(f"[NAVIGATION] 模型加载完成 - yolo_seg_model: {yolo_seg_model is not None}") +print(f"[NAVIGATION] 模型加载完成 - yolo_seg_model: {ctx.yolo_seg_model is not None}") + +# Day 25: 加载室内导盲模型 +print("[INDOOR] 开始加载室内导盲模型...") +load_indoor_model() +print(f"[INDOOR] 模型加载完成 - indoor_seg_model: {ctx.indoor_seg_model is not None}") # Day 14 优化: 在服务器启动时就预先创建导航器实例,避免客户端连接时延迟 -if yolo_seg_model is not None and blind_path_navigator is None: +if ctx.yolo_seg_model is not None and ctx.blind_path_navigator is None: print("[NAVIGATION] 预初始化盲道导航器...") - blind_path_navigator = BlindPathNavigator(yolo_seg_model, obstacle_detector) + ctx.blind_path_navigator = BlindPathNavigator(ctx.yolo_seg_model, ctx.obstacle_detector) print("[NAVIGATION] 盲道导航器已预初始化") -if yolo_seg_model is not None and cross_street_navigator is None: +if ctx.yolo_seg_model is not None and ctx.cross_street_navigator is None: print("[CROSS_STREET] 预初始化过马路导航器...") - cross_street_navigator = CrossStreetNavigator( - seg_model=yolo_seg_model, + ctx.cross_street_navigator = CrossStreetNavigator( + seg_model=ctx.yolo_seg_model, coco_model=None, obs_model=None ) print("[CROSS_STREET] 过马路导航器已预初始化") -if orchestrator is None and blind_path_navigator is not None and cross_street_navigator is not None: +# Day 26: 创建室内导航器(使用专用 IndoorNavigator) +if ctx.indoor_seg_model is not None and ctx.indoor_navigator is None: + print("[INDOOR] 预初始化室内导航器...") + from workflow_indoor import IndoorNavigator + ctx.indoor_navigator = IndoorNavigator(ctx.indoor_seg_model) + print("[INDOOR] 室内导航器已预初始化") + +if ctx.orchestrator is None and ctx.blind_path_navigator is not None and ctx.cross_street_navigator is not None: print("[NAV MASTER] 预初始化统领状态机...") - orchestrator = NavigationMaster(blind_path_navigator, cross_street_navigator) + ctx.orchestrator = NavigationMaster(ctx.blind_path_navigator, ctx.cross_street_navigator, indoor_nav=ctx.indoor_navigator) print("[NAV MASTER] 统领状态机已预初始化") # 【新增】启动同步录制 @@ -520,18 +598,12 @@ print("[RECORDER] 已注册退出处理器 - Ctrl+C时会自动保存录制文 # 【新增】预加载红绿灯检测模型(避免进入WAIT_TRAFFIC_LIGHT状态时卡顿) +# Day 26 优化: init_model() 内部已包含预热,无需重复调用 try: import trafficlight_detection print("[TRAFFIC_LIGHT] 开始预加载红绿灯检测模型...") if trafficlight_detection.init_model(): print("[TRAFFIC_LIGHT] 红绿灯检测模型预加载成功") - # 执行一次测试推理,完全预热模型 - try: - test_img = np.zeros((640, 640, 3), dtype=np.uint8) - _ = trafficlight_detection.process_single_frame(test_img) - print("[TRAFFIC_LIGHT] 模型预热完成") - except Exception as e: - print(f"[TRAFFIC_LIGHT] 模型预热失败: {e}") else: print("[TRAFFIC_LIGHT] 红绿灯检测模型预加载失败") except Exception as e: @@ -561,28 +633,28 @@ ITEM_TO_CLASS_MAP = { async def ui_broadcast_raw(msg: str): dead = [] - for k, ws in list(ui_clients.items()): + for k, ws in list(ctx.ui_clients.items()): try: await ws.send_text(msg) except Exception: dead.append(k) for k in dead: - ui_clients.pop(k, None) + ctx.ui_clients.pop(k, None) async def ui_broadcast_partial(text: str): - global current_partial - current_partial = text + # global current_partial (Moved to ctx) + ctx.current_partial = text await ui_broadcast_raw("PARTIAL:" + text) async def ui_broadcast_final(text: str): - global current_partial, recent_finals - current_partial = "" - recent_finals.append(text) - if len(recent_finals) > RECENT_MAX: - recent_finals = recent_finals[-RECENT_MAX:] + # global current_partial, recent_finals (Moved to ctx) + ctx.current_partial = "" + ctx.recent_finals.append(text) + if len(ctx.recent_finals) > 50: + ctx.recent_finals = ctx.recent_finals[-50:] await ui_broadcast_raw("FINAL:" + text) - print(f"[ASR/AI FINAL] {text}", flush=True) + print(f"[ASR/AI FINAL] {text}") async def full_system_reset(reason: str = ""): """ @@ -600,20 +672,20 @@ async def full_system_reset(reason: str = ""): await stop_current_recognition() # 3) UI - global current_partial, recent_finals - current_partial = "" - recent_finals = [] + # global current_partial, recent_finals (Moved to ctx) + ctx.current_partial = "" + ctx.recent_finals = [] # 4) 相机帧 try: - last_frames.clear() + ctx.last_frames.clear() except Exception: pass # 5) 通知 ESP32 try: - if esp32_audio_ws and (esp32_audio_ws.client_state == WebSocketState.CONNECTED): - await esp32_audio_ws.send_text("RESET") + if ctx.esp32_audio_ws and (ctx.esp32_audio_ws.client_state == WebSocketState.CONNECTED): + await ctx.esp32_audio_ws.send_text("RESET") except Exception: pass @@ -622,20 +694,20 @@ async def full_system_reset(reason: str = ""): # ========= 启动/停止 YOLO 媒体处理 ========= def start_yolomedia_with_target(target_name: str): """启动yolomedia线程,搜索指定物品""" - global yolomedia_thread, yolomedia_stop_event, yolomedia_running, yolomedia_sending_frames + global yolomedia_thread, yolomedia_stop_event # running/sending moved to ctx # 如果已经在运行,先停止 - if yolomedia_running: + if ctx.yolomedia_running: stop_yolomedia() # 查找对应的YOLO类别 yolo_class = ITEM_TO_CLASS_MAP.get(target_name, target_name) print(f"[YOLOMEDIA] Starting with target: {target_name} -> YOLO class: {yolo_class}", flush=True) - print(f"[YOLOMEDIA] Available mappings: {ITEM_TO_CLASS_MAP}", flush=True) # 添加这行调试 + # print(f"[YOLOMEDIA] Available mappings: {ITEM_TO_CLASS_MAP}", flush=True) yolomedia_stop_event.clear() - yolomedia_running = True - yolomedia_sending_frames = False # 重置发送帧状态 + ctx.yolomedia_running = True + ctx.yolomedia_sending_frames = False # 重置发送帧状态 def _run(): try: @@ -644,9 +716,8 @@ def start_yolomedia_with_target(target_name: str): except Exception as e: print(f"[YOLOMEDIA] worker stopped: {e}", flush=True) finally: - global yolomedia_running, yolomedia_sending_frames - yolomedia_running = False - yolomedia_sending_frames = False + ctx.yolomedia_running = False + ctx.yolomedia_sending_frames = False yolomedia_thread = threading.Thread(target=_run, daemon=True) yolomedia_thread.start() @@ -654,9 +725,9 @@ def start_yolomedia_with_target(target_name: str): def stop_yolomedia(): """停止yolomedia线程""" - global yolomedia_thread, yolomedia_stop_event, yolomedia_running, yolomedia_sending_frames + global yolomedia_thread, yolomedia_stop_event # running/sending moved to ctx - if yolomedia_running: + if ctx.yolomedia_running: print("[YOLOMEDIA] Stopping worker...", flush=True) yolomedia_stop_event.set() @@ -664,8 +735,8 @@ def stop_yolomedia(): if yolomedia_thread and yolomedia_thread.is_alive(): yolomedia_thread.join(timeout=5.0) - yolomedia_running = False - yolomedia_sending_frames = False + ctx.yolomedia_running = False + ctx.yolomedia_sending_frames = False # 【新增】如果orchestrator在找物品模式,结束时不自动恢复(由命令控制) # 只清理标志位即可 @@ -674,11 +745,11 @@ def stop_yolomedia(): # ========= 自定义的 start_ai_with_text,支持识别特殊命令 ========= async def start_ai_with_text_custom(user_text: str): """扩展版的AI启动函数,支持识别特殊命令""" - global navigation_active, blind_path_navigator, cross_street_active, cross_street_navigator, orchestrator + # global navigation_active, ... (Moved to ctx) # 【修改】在导航模式和红绿灯检测模式下,只有特定词才进入omni对话 - if orchestrator: - current_state = orchestrator.get_state() + if ctx.orchestrator: + current_state = ctx.orchestrator.get_state() # 如果在导航模式或红绿灯检测模式(非CHAT模式) if current_state not in ["CHAT", "IDLE"]: # 检查是否是允许的对话触发词 @@ -687,7 +758,8 @@ async def start_ai_with_text_custom(user_text: str): # 检查是否是导航控制命令 nav_control_keywords = ["开始过马路", "过马路结束", "开始导航", "盲道导航", "停止导航", "结束导航", - "检测红绿灯", "看红绿灯", "停止检测", "停止红绿灯"] + "检测红绿灯", "看红绿灯", "停止检测", "停止红绿灯", + "室内导航", "室内导盲"] # 新增室内导航 is_nav_control = any(keyword in user_text for keyword in nav_control_keywords) # 如果既不是允许的查询,也不是导航控制命令,则丢弃 @@ -699,13 +771,13 @@ async def start_ai_with_text_custom(user_text: str): # 【修改】检查是否是过马路相关命令 - 使用orchestrator控制 if "开始过马路" in user_text or "帮我过马路" in user_text: # 【新增】如果正在找物品,先停止 - if yolomedia_running: + if ctx.yolomedia_running: stop_yolomedia() print("[ITEM_SEARCH] 从找物品模式切换到过马路") - if orchestrator: - orchestrator.start_crossing() - print(f"[CROSS_STREET] 过马路模式已启动,状态: {orchestrator.get_state()}") + if ctx.orchestrator: + ctx.orchestrator.start_crossing() + print(f"[CROSS_STREET] 过马路模式已启动,状态: {ctx.orchestrator.get_state()}") # 播放启动语音并广播到UI play_voice_text("过马路模式已启动。") await ui_broadcast_final("[系统] 过马路模式已启动") @@ -716,9 +788,9 @@ async def start_ai_with_text_custom(user_text: str): return if "过马路结束" in user_text or "结束过马路" in user_text: - if orchestrator: - orchestrator.stop_navigation() - print(f"[CROSS_STREET] 导航已停止,状态: {orchestrator.get_state()}") + if ctx.orchestrator: + ctx.orchestrator.stop_navigation() + print(f"[CROSS_STREET] 导航已停止,状态: {ctx.orchestrator.get_state()}") # 播放停止语音并广播到UI play_voice_text("已停止导航。") await ui_broadcast_final("[系统] 过马路模式已停止") @@ -729,12 +801,16 @@ async def start_ai_with_text_custom(user_text: str): # 【修改】检查是否是红绿灯检测命令 - 实现与盲道导航互斥 if "检测红绿灯" in user_text or "看红绿灯" in user_text: try: - import trafficlight_detection - + # import trafficlight_detection (Fixed: use top-level Safe Import) + if not trafficlight_detection: + logger.warning("[TRAFFIC] trafficlight_detection module missing") + await ui_broadcast_final("[系统] 红绿灯功能未启用") + return + # 切换orchestrator到红绿灯检测模式(暂停盲道导航) - if orchestrator: - orchestrator.start_traffic_light_detection() - print(f"[TRAFFIC] 切换到红绿灯检测模式,状态: {orchestrator.get_state()}") + if ctx.orchestrator: + ctx.orchestrator.start_traffic_light_detection() + logger.info(f"[TRAFFIC] 切换到红绿灯检测模式,状态: {ctx.orchestrator.get_state()}") # 【改进】使用主线程模式而不是独立线程,避免掉帧 success = trafficlight_detection.init_model() # 只初始化模型,不启动线程 @@ -752,9 +828,9 @@ async def start_ai_with_text_custom(user_text: str): if "停止检测" in user_text or "停止红绿灯" in user_text: try: # 恢复到对话模式 - if orchestrator: - orchestrator.stop_navigation() # 回到CHAT模式 - print(f"[TRAFFIC] 红绿灯检测停止,恢复到{orchestrator.get_state()}模式") + if ctx.orchestrator: + ctx.orchestrator.stop_navigation() # 回到CHAT模式 + print(f"[TRAFFIC] 红绿灯检测停止,恢复到{ctx.orchestrator.get_state()}模式") # 清除红绿灯检测缓存 global _traffic_light_result_jpeg @@ -769,32 +845,50 @@ async def start_ai_with_text_custom(user_text: str): # 【修改】检查是否是导航相关命令 - 使用orchestrator控制 if "开始导航" in user_text or "盲道导航" in user_text or "帮我导航" in user_text: # 【新增】如果正在找物品,先停止 - if yolomedia_running: + if ctx.yolomedia_running: stop_yolomedia() print("[ITEM_SEARCH] 从找物品模式切换到盲道导航") - if orchestrator: - orchestrator.start_blind_path_navigation() - print(f"[NAVIGATION] 盲道导航已启动,状态: {orchestrator.get_state()}") + if ctx.orchestrator: + ctx.orchestrator.start_blind_path_navigation() + print(f"[NAVIGATION] 盲道导航已启动,状态: {ctx.orchestrator.get_state()}") await ui_broadcast_final("[系统] 盲道导航已启动") else: print("[NAVIGATION] 警告:导航统领器未初始化!") await ui_broadcast_final("[系统] 导航系统未就绪") return + # 【新增】检查是否是室内导航命令 + if "室内导航" in user_text or "室内导盲" in user_text: + # 如果正在找物品,先停止 + if ctx.yolomedia_running: + stop_yolomedia() + print("[ITEM_SEARCH] 从找物品模式切换到室内导航") + + if ctx.orchestrator: + ctx.orchestrator.start_indoor_navigation() + print(f"[INDOOR] 室内导航已启动,状态: {ctx.orchestrator.get_state()}") + await ui_broadcast_final("[系统] 室内导航已启动") + else: + print("[INDOOR] 警告:导航统领器未初始化!") + await ui_broadcast_final("[系统] 导航系统未就绪") + return + + # 【修改】停止导航优先判断 + # 只要包含"停止导航"或"结束导航",无论是否包含"室内",都视为停止指令 if "停止导航" in user_text or "结束导航" in user_text: - if orchestrator: - orchestrator.stop_navigation() - print(f"[NAVIGATION] 导航已停止,状态: {orchestrator.get_state()}") - await ui_broadcast_final("[系统] 盲道导航已停止") + if ctx.orchestrator: + ctx.orchestrator.stop_navigation() + print(f"[NAVIGATION] 导航已停止,状态: {ctx.orchestrator.get_state()}") + await ui_broadcast_final("[系统] 导航已停止") else: await ui_broadcast_final("[系统] 导航系统未运行") return - + nav_cmd_keywords = ["开始过马路", "过马路结束", "开始导航", "盲道导航", "停止导航", "结束导航", "立即通过", "现在通过", "继续"] if any(k in user_text for k in nav_cmd_keywords): - if orchestrator: - orchestrator.on_voice_command(user_text) + if ctx.orchestrator: + ctx.orchestrator.on_voice_command(user_text) await ui_broadcast_final("[系统] 导航模式已更新") else: await ui_broadcast_final("[系统] 导航统领器未初始化") @@ -814,9 +908,9 @@ async def start_ai_with_text_custom(user_text: str): print(f"[COMMAND] Finder request: '{item_cn}' -> '{label_en}' (src={src})", flush=True) # 【新增】切换到找物品模式(暂停导航) - if orchestrator: - orchestrator.start_item_search() - print(f"[ITEM_SEARCH] 已切换到找物品模式,状态: {orchestrator.get_state()}") + if ctx.orchestrator: + ctx.orchestrator.start_item_search() + print(f"[ITEM_SEARCH] 已切换到找物品模式,状态: {ctx.orchestrator.get_state()}") # 【关键】把英文类名传给 yolomedia(它会在找不到类时自动切 YOLOE) start_yolomedia_with_target(label_en) @@ -836,9 +930,9 @@ async def start_ai_with_text_custom(user_text: str): stop_yolomedia() # 【新增】停止找物品模式,恢复之前的导航状态 - if orchestrator: - orchestrator.stop_item_search(restore_nav=True) - current_state = orchestrator.get_state() + if ctx.orchestrator: + ctx.orchestrator.stop_item_search(restore_nav=True) + current_state = ctx.orchestrator.get_state() print(f"[ITEM_SEARCH] 找物品结束,当前状态: {current_state}") # 根据恢复的状态给出反馈 @@ -852,24 +946,24 @@ async def start_ai_with_text_custom(user_text: str): return # 【修改】omni对话开始时,切换到CHAT模式 - global omni_conversation_active, omni_previous_nav_state - omni_conversation_active = True + # global omni_conversation_active, omni_previous_nav_state (Moved to ctx) + ctx.omni_conversation_active = True # 保存当前导航状态并切换到CHAT模式 - if orchestrator: - current_state = orchestrator.get_state() + if ctx.orchestrator: + current_state = ctx.orchestrator.get_state() # 只有在导航模式下才需要保存和切换 if current_state not in ["CHAT", "IDLE"]: - omni_previous_nav_state = current_state - orchestrator.force_state("CHAT") + ctx.omni_previous_nav_state = current_state + ctx.orchestrator.force_state("CHAT") print(f"[OMNI] 对话开始,从{current_state}切换到CHAT模式") else: - omni_previous_nav_state = None + ctx.omni_previous_nav_state = None print(f"[OMNI] 对话开始(当前已在{current_state}模式)") # 如果不是特殊命令,执行原有的AI对话逻辑 # 但如果yolomedia正在运行,暂时不处理普通对话 - if yolomedia_running: + if ctx.yolomedia_running: print("[AI] YOLO media is running, skipping normal AI response", flush=True) return @@ -882,7 +976,7 @@ async def start_ai_with_text(user_text: str): # Day 13: 在 AI 处理开始前保存 WebSocket 引用 from audio_stream import set_tts_websocket - set_tts_websocket(esp32_audio_ws) + set_tts_websocket(ctx.esp32_audio_ws) async def _runner_new_pipeline(): """Day 21: 新管道 - GLM-4.5-Flash + EdgeTTS""" @@ -891,9 +985,9 @@ async def start_ai_with_text(user_text: str): try: # 获取图片(如果有) img_b64 = None - if last_frames: + if ctx.last_frames: try: - _, jpeg_bytes = last_frames[-1] + _, jpeg_bytes = ctx.last_frames[-1] img_b64 = base64.b64encode(jpeg_bytes).decode("ascii") except Exception: pass @@ -929,9 +1023,7 @@ async def start_ai_with_text(user_text: str): raise except Exception as e: err_msg = f"AI Error: {str(e)}" - print(f"[NEW AI] 错误: {err_msg}") - import traceback - traceback.print_exc() + logger.error(f"[NEW AI] 错误: {err_msg}", exc_info=True) # 1. 广播错误到 UI try: @@ -939,10 +1031,10 @@ async def start_ai_with_text(user_text: str): except Exception: pass - # 2. 发送错误到客户端日志 - if esp32_audio_ws: + # 2. 发送开始回复信号 + if ctx.esp32_audio_ws: try: - await esp32_audio_ws.send_text(f"ERR:{str(e)[:50]}") + await ctx.esp32_audio_ws.send_text("OK:REPLY_START") except Exception: pass @@ -957,13 +1049,13 @@ async def start_ai_with_text(user_text: str): except Exception: pass finally: - global omni_conversation_active, omni_previous_nav_state - omni_conversation_active = False + # global omni_conversation_active, omni_previous_nav_state (Moved to ctx) + ctx.omni_conversation_active = False - if orchestrator and omni_previous_nav_state: - orchestrator.force_state(omni_previous_nav_state) - print(f"[AI] 对话结束,恢复到{omni_previous_nav_state}模式") - omni_previous_nav_state = None + if ctx.orchestrator and ctx.omni_previous_nav_state: + ctx.orchestrator.force_state(ctx.omni_previous_nav_state) + # print(f"[AI] 对话结束,恢复到{ctx.omni_previous_nav_state}模式") + ctx.omni_previous_nav_state = None from audio_stream import stream_clients for sc in list(stream_clients): @@ -983,9 +1075,9 @@ async def start_ai_with_text(user_text: str): # 组装(图像+文本) content_list = [] - if last_frames: + if ctx.last_frames: try: - _, jpeg_bytes = last_frames[-1] + _, jpeg_bytes = ctx.last_frames[-1] img_b64 = base64.b64encode(jpeg_bytes).decode("ascii") content_list.append({ "type": "image_url", @@ -1023,13 +1115,13 @@ async def start_ai_with_text(user_text: str): except Exception: pass finally: - global omni_conversation_active, omni_previous_nav_state - omni_conversation_active = False + # global omni_conversation_active, omni_previous_nav_state (Moved to ctx) + ctx.omni_conversation_active = False - if orchestrator and omni_previous_nav_state: - orchestrator.force_state(omni_previous_nav_state) - print(f"[OMNI] 对话结束,恢复到{omni_previous_nav_state}模式") - omni_previous_nav_state = None + if ctx.orchestrator and ctx.omni_previous_nav_state: + ctx.orchestrator.force_state(ctx.omni_previous_nav_state) + # print(f"[OMNI] 对话结束,恢复到{ctx.omni_previous_nav_state}模式") + ctx.omni_previous_nav_state = None else: print(f"[OMNI] 对话结束(无需恢复导航状态)") @@ -1077,16 +1169,19 @@ register_stream_route(app) @app.websocket("/ws_ui") async def ws_ui(ws: WebSocket): await ws.accept() - ui_clients[id(ws)] = ws + # ui_clients[id(ws)] = ws (Fixed: use ctx) + ctx.ui_clients[id(ws)] = ws try: - init = {"partial": current_partial, "finals": recent_finals[-10:]} + # init = {"partial": current_partial, "finals": recent_finals[-10:]} (Fixed: use ctx) + init = {"partial": ctx.current_partial, "finals": ctx.recent_finals[-10:]} await ws.send_text("INIT:" + json.dumps(init, ensure_ascii=False)) while True: await asyncio.sleep(60) except (WebSocketDisconnect, asyncio.CancelledError): pass finally: - ui_clients.pop(id(ws), None) + # ui_clients.pop(id(ws), None) (Fixed: use ctx) + ctx.ui_clients.pop(id(ws), None) # ---------- Day 21: 新版 AI 音频处理 (SenseVoice + GLM + EdgeTTS) ---------- @@ -1099,14 +1194,14 @@ async def process_complete_audio_new_pipeline(audio_data: bytes, ws: WebSocket): """ try: # 1. 语音识别 - print(f"[NEW AI] 开始识别音频: {len(audio_data)} bytes") + logger.info(f"[NEW AI] 开始识别音频: {len(audio_data)} bytes") user_text = await sensevoice_recognize(audio_data) if not user_text or len(user_text.strip()) < 2: - print("[NEW AI] 未识别到有效语音") + logger.info("[NEW AI] 未识别到有效语音") return - print(f"[NEW AI] 用户说: {user_text}") + logger.info(f"[NEW AI] 用户说: {user_text}") await ui_broadcast_partial(f"[用户] {user_text}") # 检查是否是导航命令 @@ -1117,10 +1212,10 @@ async def process_complete_audio_new_pipeline(audio_data: bytes, ws: WebSocket): ai_response = await glm_chat(user_text) if not ai_response: - print("[NEW AI] AI 无回复") + logger.info("[NEW AI] AI 无回复") return - print(f"[NEW AI] AI 回复: {ai_response}") + logger.info(f"[NEW AI] AI 回复: {ai_response}") await ui_broadcast_final(f"[AI] {ai_response}") # 3. EdgeTTS 流式合成并发送 @@ -1135,25 +1230,23 @@ async def process_complete_audio_new_pipeline(audio_data: bytes, ws: WebSocket): finally: vad.set_tts_playing(False) - print("[NEW AI] 音频合成并发送完成") + logger.info("[NEW AI] 音频合成并发送完成") except Exception as e: - print(f"[NEW AI] 处理失败: {e}") - import traceback - traceback.print_exc() + logger.error(f"[NEW AI] 处理失败: {e}", exc_info=True) # ---------- WebSocket:设备音频入口(ASR 上行) ---------- @app.websocket("/ws_audio") async def ws_audio(ws: WebSocket): - global esp32_audio_ws - esp32_audio_ws = ws + # global esp32_audio_ws (Moved to ctx) + ctx.esp32_audio_ws = ws # Day 20: 连接建立时立即保存 TTS WebSocket 引用 # 避免因引用丢失导致 TTS 音频无法发送 from audio_stream import set_tts_websocket set_tts_websocket(ws) await ws.accept() - print("\n[AUDIO] client connected (TTS WebSocket reference saved)") + logger.info("\n[AUDIO] client connected (TTS WebSocket reference saved)") recognition = None streaming = False last_ts = time.monotonic() @@ -1215,7 +1308,7 @@ async def ws_audio(ws: WebSocket): cmd = raw.upper() if cmd == "START": - print("[AUDIO] START received") + logger.info("[AUDIO] START received") await stop_rec() # Day 13: 刷新 TTS 缓存 @@ -1223,9 +1316,9 @@ async def ws_audio(ws: WebSocket): from audio_stream import flush_tts_buffer flushed = await flush_tts_buffer(ws) if flushed > 0: - print(f"[AUDIO] Flushed {flushed} bytes of cached TTS audio") + logger.info(f"[AUDIO] Flushed {flushed} bytes of cached TTS audio") except Exception as e: - print(f"[AUDIO] Error flushing TTS buffer: {e}") + logger.warning(f"[AUDIO] Error flushing TTS buffer: {e}") if USE_NEW_AI_PIPELINE: # Day 21: 新管道 - 服务器端 VAD + 非流式 SenseVoice @@ -1238,7 +1331,7 @@ async def ws_audio(ws: WebSocket): streaming = True await ui_broadcast_partial("(已开始接收音频…)") await ws.send_text("OK:STARTED") - print("[NEW ASR] 新管道已启动,服务器端 VAD 监听中") + logger.info("[NEW ASR] 新管道已启动,服务器端 VAD 监听中") else: # 旧管道 - 流式 DashScope loop = asyncio.get_running_loop() @@ -1278,7 +1371,7 @@ async def ws_audio(ws: WebSocket): elif cmd == "RECOGNIZE" and USE_NEW_AI_PIPELINE: # Day 21: 客户端 VAD 检测到语音结束,请求识别 if audio_buffer and len(audio_buffer) > 3200: # 至少 100ms 音频 - print(f"[NEW ASR] 收到 RECOGNIZE 命令,音频长度: {len(audio_buffer)} bytes") + logger.info(f"[NEW ASR] 收到 RECOGNIZE 命令,音频长度: {len(audio_buffer)} bytes") await ui_broadcast_partial("(正在识别…)") # 非流式识别 @@ -1286,7 +1379,7 @@ async def ws_audio(ws: WebSocket): audio_buffer.clear() if user_text and len(user_text.strip()) >= 2: - print(f"[NEW ASR] 识别结果: {user_text}") + logger.info(f"[NEW ASR] 识别结果: {user_text}") await ui_broadcast_final(f"[用户] {user_text}") # 调用 AI 回复 @@ -1294,7 +1387,7 @@ async def ws_audio(ws: WebSocket): await start_ai_with_text_custom(user_text) await ws.send_text("OK:RECOGNIZED") else: - print("[NEW ASR] 未识别到有效语音") + logger.info("[NEW ASR] 未识别到有效语音") await ws.send_text("OK:EMPTY") else: print("[NEW ASR] 音频太短,忽略") @@ -1318,7 +1411,7 @@ async def ws_audio(ws: WebSocket): ws_audio._audio_total_bytes += len(audio_bytes) if ws_audio._audio_recv_count % 500 == 0: - print(f"[AUDIO] 📥 Received: {ws_audio._audio_recv_count} packets, {ws_audio._audio_total_bytes} bytes total") + logger.info(f"[AUDIO] 📥 Received: {ws_audio._audio_recv_count} packets, {ws_audio._audio_total_bytes} bytes total") if USE_NEW_AI_PIPELINE: # Day 21 改进: 使用服务器端 VAD 检测语音 @@ -1332,14 +1425,14 @@ async def ws_audio(ws: WebSocket): if vad_result['speech_ended'] and vad_result['speech_audio']: # VAD 检测到语音结束,自动触发识别 speech_audio = vad_result['speech_audio'] - print(f"[VAD] 自动触发识别,音频长度: {len(speech_audio)} bytes") + logger.info(f"[VAD] 自动触发识别,音频长度: {len(speech_audio)} bytes") await ui_broadcast_partial("(正在识别…)") # 非流式识别 user_text = await sensevoice_recognize(speech_audio) if user_text and len(user_text.strip()) >= 2: - print(f"[NEW ASR] 识别结果: {user_text}") + logger.info(f"[NEW ASR] 识别结果: {user_text}") await ui_broadcast_final(f"[用户] {user_text}") # 调用 AI 回复 @@ -1347,7 +1440,7 @@ async def ws_audio(ws: WebSocket): await start_ai_with_text_custom(user_text) await ws.send_text("OK:RECOGNIZED") else: - print("[NEW ASR] 未识别到有效语音") + logger.info("[NEW ASR] 未识别到有效语音") await ws.send_text("OK:EMPTY") else: # 旧管道:实时发送到 DashScope @@ -1359,7 +1452,7 @@ async def ws_audio(ws: WebSocket): await on_sdk_error("send_audio_frame failed") except Exception as e: - print(f"\n[WS ERROR] {e}") + logger.error(f"[WS ERROR] {e}") finally: await stop_rec() try: @@ -1367,51 +1460,51 @@ async def ws_audio(ws: WebSocket): await ws.close(code=1000) except Exception: pass - if esp32_audio_ws is ws: - esp32_audio_ws = None - print("[WS] connection closed") + if ctx.esp32_audio_ws is ws: + ctx.esp32_audio_ws = None + logger.info("[WS] connection closed") # ---------- WebSocket:设备相机入口(JPEG 二进制) ---------- @app.websocket("/ws/camera") async def ws_camera_esp(ws: WebSocket): - global esp32_camera_ws, blind_path_navigator, cross_street_navigator, cross_street_active, navigation_active, orchestrator - if esp32_camera_ws is not None: + # global esp32_camera_ws, blind_path_navigator, cross_street_navigator, cross_street_active, navigation_active, orchestrator (Moved to ctx) + if ctx.esp32_camera_ws is not None: await ws.close(code=1013) return - esp32_camera_ws = ws + ctx.esp32_camera_ws = ws await ws.accept() - print("[CAMERA] 设备已连接") + logger.info("[CAMERA] 设备已连接") # 【新增】初始化盲道导航器 - if blind_path_navigator is None and yolo_seg_model is not None: - blind_path_navigator = BlindPathNavigator(yolo_seg_model, obstacle_detector) + if ctx.blind_path_navigator is None and ctx.yolo_seg_model is not None: + ctx.blind_path_navigator = BlindPathNavigator(ctx.yolo_seg_model, ctx.obstacle_detector) print("[NAVIGATION] 盲道导航器已初始化") else: - if blind_path_navigator is not None: - print("[NAVIGATION] 导航器已存在,无需重新初始化") - elif yolo_seg_model is None: - print("[NAVIGATION] 警告:YOLO模型未加载,无法初始化导航器") + if ctx.blind_path_navigator is not None: + logger.info("[NAVIGATION] 导航器已存在,无需重新初始化") + elif ctx.yolo_seg_model is None: + logger.warning("[NAVIGATION] 警告:YOLO模型未加载,无法初始化导航器") # 【新增】初始化过马路导航器 - if cross_street_navigator is None: - if yolo_seg_model: - cross_street_navigator = CrossStreetNavigator( - seg_model=yolo_seg_model, + if ctx.cross_street_navigator is None: + if ctx.yolo_seg_model: + ctx.cross_street_navigator = CrossStreetNavigator( + seg_model=ctx.yolo_seg_model, coco_model=None, # 不使用交通灯检测 obs_model=None # 暂时也不用障碍物检测,让它更快 ) - print("[CROSS_STREET] 过马路导航器已初始化(简化版 - 仅斑马线检测)") + logger.info("[CROSS_STREET] 过马路导航器已初始化(简化版 - 仅斑马线检测)") else: - print("[CROSS_STREET] 错误:缺少分割模型,无法初始化过马路导航器") + logger.error("[CROSS_STREET] 错误:缺少分割模型,无法初始化过马路导航器") - if not yolo_seg_model: + if not ctx.yolo_seg_model: print("[CROSS_STREET] - 缺少分割模型 (yolo_seg_model)") - if not obstacle_detector: + if not ctx.obstacle_detector: print("[CROSS_STREET] - 缺少障碍物检测器 (obstacle_detector)") - if orchestrator is None and blind_path_navigator is not None and cross_street_navigator is not None: - orchestrator = NavigationMaster(blind_path_navigator, cross_street_navigator) - print("[NAV MASTER] 统领状态机已初始化(托管模式)") + if ctx.orchestrator is None and ctx.blind_path_navigator is not None and ctx.cross_street_navigator is not None: + ctx.orchestrator = NavigationMaster(ctx.blind_path_navigator, ctx.cross_street_navigator, indoor_nav=ctx.indoor_navigator) + logger.info("[NAV MASTER] 统领状态机已初始化(托管模式)") frame_counter = 0 # 添加帧计数器 # Day 20: 性能诊断变量 @@ -1438,10 +1531,10 @@ async def ws_camera_esp(ws: WebSocket): sync_recorder.record_frame(data) except Exception as e: if frame_counter % 100 == 0: # 避免日志刷屏 - print(f"[RECORDER] 录制帧失败: {e}") + logger.error(f"[RECORDER] 录制帧失败: {e}") try: - last_frames.append((time.time(), data)) + ctx.last_frames.append((time.time(), data)) except Exception: pass @@ -1450,7 +1543,7 @@ async def ws_camera_esp(ws: WebSocket): # 【调试】检查导航条件 if frame_counter % 60 == 0: # 每60帧输出一次(约5-6秒) - state_dbg = orchestrator.get_state() if orchestrator else "N/A" + state_dbg = ctx.orchestrator.get_state() if ctx.orchestrator else "N/A" # Day 20: 性能诊断汇总 if _perf_frame_intervals: @@ -1466,20 +1559,20 @@ async def ws_camera_esp(ws: WebSocket): _perf_broadcast_times.clear() _perf_nav_times.clear() - print(f"[PERF] 帧:{frame_counter} | 客户端FPS:{fps:.1f} | 帧间隔:{avg_interval:.1f}ms | " + logger.info(f"[PERF] 帧:{frame_counter} | 客户端FPS:{fps:.1f} | 帧间隔:{avg_interval:.1f}ms | " f"广播:{avg_broadcast:.1f}ms | 导航:{avg_nav:.1f}ms | state={state_dbg}") # 【Day 19 优化】延迟解码:只在需要处理时才解码,避免白白浪费 CPU # 先检查是否需要导航处理 - needs_processing = (orchestrator and not yolomedia_running) + needs_processing = (ctx.orchestrator and not ctx.yolomedia_running) bgr = None # 延迟初始化 if needs_processing: - current_state = orchestrator.get_state() + current_state = ctx.orchestrator.get_state() # 【Day 19】ITEM_SEARCH/CHAT/IDLE 模式无需处理,直接转发原始 JPEG if current_state in ("ITEM_SEARCH", "CHAT", "IDLE"): - if not yolomedia_sending_frames and camera_viewers: + if not ctx.yolomedia_sending_frames and ctx.camera_viewers: await _broadcast_to_viewers(data) # 零拷贝直传 continue @@ -1488,22 +1581,27 @@ async def ws_camera_esp(ws: WebSocket): bgr = turbo_decode(data) if bgr is None or bgr.size == 0: if frame_counter % 30 == 0: - print(f"[JPEG] 解码失败:数据长度={len(data)}") + logger.warning(f"[JPEG] 解码失败:数据长度={len(data)}") bgr = None except Exception as e: if frame_counter % 30 == 0: - print(f"[JPEG] 解码异常: {e}") + logger.error(f"[JPEG] 解码异常: {e}") bgr = None # 【托管】优先交给统领状态机(寻物未占用画面时) - if orchestrator and not yolomedia_running and bgr is not None: + if ctx.orchestrator and not ctx.yolomedia_running and bgr is not None: out_img = bgr # 默认输出原图 try: # 【新增】检查是否在红绿灯检测模式 if current_state == "TRAFFIC_LIGHT_DETECTION": # 红绿灯检测模式:使用跳帧机制避免阻塞 - import trafficlight_detection - global _traffic_light_task, _traffic_light_result_jpeg, _traffic_light_pending_frame + # import trafficlight_detection (Fixed: use top-level Safe Import) + if not trafficlight_detection: + continue + + # global _traffic_light_task, ... (Moved to ctx? No, kept as globals but local to logic) + # Actually better to use ctx for these too, but they are localized here. + # For safety, I should use the global module variable. # 更新待处理帧 _traffic_light_pending_frame = bgr @@ -1533,7 +1631,7 @@ async def ws_camera_esp(ws: WebSocket): ) # 广播红绿灯检测结果(独立于盲道导航缓存) - if camera_viewers: + if ctx.camera_viewers: if _traffic_light_result_jpeg is not None: await _broadcast_to_viewers(_traffic_light_result_jpeg) else: @@ -1542,32 +1640,33 @@ async def ws_camera_esp(ws: WebSocket): else: # 【Day 15 跳帧机制】非阻塞式帧处理 # 不等待处理完成,使用最后一次成功的结果 - global _nav_processing_task, _nav_last_result_image, _nav_last_result_jpeg, _nav_pending_frame + # global _nav_processing_task, _nav_last_result_image, ... (Moved to ctx) # 更新待处理帧(始终是最新的) - _nav_pending_frame = bgr + ctx.nav_pending_frame = bgr # 如果没有正在运行的任务,启动一个 - if _nav_processing_task is None or _nav_processing_task.done(): + if ctx.nav_processing_task is None or ctx.nav_processing_task.done(): # 检查上一个任务的结果 - if _nav_processing_task is not None and _nav_processing_task.done(): + if ctx.nav_processing_task is not None and ctx.nav_processing_task.done(): # Day 20: 记录处理耗时 - global _nav_task_start_time - if _nav_task_start_time is not None: - nav_elapsed = (time.perf_counter() - _nav_task_start_time) * 1000 + if ctx.nav_task_start_time is not None: + nav_elapsed = (time.perf_counter() - ctx.nav_task_start_time) * 1000 _perf_nav_times.append(nav_elapsed) - _nav_task_start_time = None + ctx.nav_task_start_time = None try: - res = _nav_processing_task.result() + res = ctx.nav_processing_task.result() if res is not None: - _nav_last_result_image = res.annotated_image + ctx.nav_last_result_image = res.annotated_image # 【Day 19 优化】立即编码并缓存 JPEG,避免每帧重复编码 - if _nav_last_result_image is not None: + if ctx.nav_last_result_image is not None: # 使用 TurboJPEG 编码 - enc_result = turbo_encode(_nav_last_result_image, quality=80) + enc_result = turbo_encode(ctx.nav_last_result_image, quality=80) if enc_result: - _nav_last_result_jpeg = enc_result + if len(ctx.recent_finals) > 50: + ctx.recent_finals.pop(0) + ctx.nav_last_result_jpeg = enc_result # 语音引导 if res.guidance_text: try: @@ -1580,7 +1679,7 @@ async def ws_camera_esp(ws: WebSocket): # 检查是否有正在进行的 AI 对话 if is_playing_now(): # 打断 AI 对话,优先播报障碍物警告 - print(f"[PRIORITY INTERRUPT] 检测到障碍物警告,打断AI对话: {res.guidance_text}") + logger.warning(f"[PRIORITY INTERRUPT] 检测到障碍物警告,打断AI对话: {res.guidance_text}") asyncio.create_task(hard_reset_audio("Obstacle priority interrupt")) play_voice_text(res.guidance_text) @@ -1588,33 +1687,32 @@ async def ws_camera_esp(ws: WebSocket): except Exception: pass except Exception: - print(f"[NAV MASTER] 获取导航结果异常:") - traceback.print_exc() + logger.error(f"[NAV MASTER] 获取导航结果异常:", exc_info=True) # 启动新的处理任务 - if _nav_pending_frame is not None: - frame_to_process = _nav_pending_frame - _nav_pending_frame = None - _nav_task_start_time = time.perf_counter() # Day 20: 记录开始时间 + if ctx.nav_pending_frame is not None: + frame_to_process = ctx.nav_pending_frame + ctx.nav_pending_frame = None + ctx.nav_task_start_time = time.perf_counter() # Day 20: 记录开始时间 loop = asyncio.get_event_loop() - _nav_processing_task = loop.run_in_executor( - frame_processing_executor, - orchestrator.process_frame, + ctx.nav_processing_task = loop.run_in_executor( + ctx.frame_processing_executor, + ctx.orchestrator.process_frame, frame_to_process ) # 使用最后一次成功的结果(不阻塞等待) - out_img = _nav_last_result_image if _nav_last_result_image is not None else bgr + out_img = ctx.nav_last_result_image if ctx.nav_last_result_image is not None else bgr except Exception as e: if frame_counter % 100 == 0: - print(f"[NAV MASTER] 处理帧时出错: {e}") + logger.error(f"[NAV MASTER] 处理帧时出错: {e}") # 【Day 19 优化】广播导航结果,优先使用缓存的 JPEG - if camera_viewers: + if ctx.camera_viewers: _t_broadcast = time.perf_counter() # Day 20: 计时 # 如果有缓存的 JPEG(导航结果),直接使用 - if _nav_last_result_jpeg is not None: - await _broadcast_to_viewers(_nav_last_result_jpeg) + if ctx.nav_last_result_jpeg is not None: + await _broadcast_to_viewers(ctx.nav_last_result_jpeg) elif out_img is not None: # 回退:使用 TurboJPEG 编码当前帧 enc_result = turbo_encode(out_img, quality=80) @@ -1629,43 +1727,44 @@ async def ws_camera_esp(ws: WebSocket): # 【Day 19 优化】零拷贝直传:原始 JPEG 直接转发,无需解码再编码 # 之前的问题:imdecode + imencode 浪费 CPU,原始 data 就是 JPEG - if not yolomedia_sending_frames and camera_viewers: + if not ctx.yolomedia_sending_frames and ctx.camera_viewers: try: # 直接转发原始 JPEG 数据,跳过解码-编码循环 await _broadcast_to_viewers(data) except Exception as e: - print(f"[CAMERA] Broadcast error: {e}") + logger.error(f"[CAMERA] Broadcast error: {e}") elif "type" in msg and msg["type"] in ("websocket.close", "websocket.disconnect"): break except WebSocketDisconnect: pass except Exception as e: - print(f"[CAMERA ERROR] {e}") + logger.error(f"[CAMERA ERROR] {e}") finally: try: if WebSocketState is None or ws.client_state == WebSocketState.CONNECTED: await ws.close(code=1000) except Exception: pass - esp32_camera_ws = None - print("[CAMERA] 设备已断开") + # esp32_camera_ws = None (Moved to ctx) + ctx.esp32_camera_ws = None + logger.info("[CAMERA] 设备已断开") # 【新增】清理导航状态 - if blind_path_navigator: - blind_path_navigator.reset() - if cross_street_navigator: - cross_street_navigator.reset() - if orchestrator: - orchestrator.reset() - print("[NAV MASTER] 统领器已重置") + if ctx.blind_path_navigator: + ctx.blind_path_navigator.reset() + if ctx.cross_street_navigator: + ctx.cross_street_navigator.reset() + if ctx.orchestrator: + ctx.orchestrator.reset() + logger.info("[NAV MASTER] 统领器已重置") # ---------- WebSocket:浏览器订阅相机帧 ---------- @app.websocket("/ws/viewer") async def ws_viewer(ws: WebSocket): await ws.accept() - camera_viewers.add(ws) - print(f"[VIEWER] Browser connected. Total viewers: {len(camera_viewers)}", flush=True) + ctx.camera_viewers.add(ws) + logger.info(f"[VIEWER] Browser connected. Total viewers: {len(ctx.camera_viewers)}") try: while True: # 保持连接活跃 @@ -1674,34 +1773,34 @@ async def ws_viewer(ws: WebSocket): pass # 正常关闭,静默处理 finally: try: - camera_viewers.remove(ws) + ctx.camera_viewers.remove(ws) except Exception: pass - print(f"[VIEWER] Removed. Total viewers: {len(camera_viewers)}", flush=True) + logger.info(f"[VIEWER] Removed. Total viewers: {len(ctx.camera_viewers)}") # ---------- WebSocket:浏览器订阅 IMU ---------- @app.websocket("/ws") async def ws_imu(ws: WebSocket): await ws.accept() - imu_ws_clients.add(ws) + ctx.imu_ws_clients.add(ws) try: while True: await asyncio.sleep(60) except (WebSocketDisconnect, asyncio.CancelledError): pass # 正常关闭,静默处理 finally: - imu_ws_clients.discard(ws) + ctx.imu_ws_clients.discard(ws) async def imu_broadcast(msg: str): - if not imu_ws_clients: return + if not ctx.imu_ws_clients: return dead = [] - for ws in list(imu_ws_clients): + for ws in list(ctx.imu_ws_clients): try: await ws.send_text(msg) except Exception: dead.append(ws) for ws in dead: - imu_ws_clients.discard(ws) + ctx.imu_ws_clients.discard(ws) # ---------- 服务端 IMU 估计(原样保留) ---------- from math import atan2, hypot, pi @@ -1839,10 +1938,10 @@ class UDPProto(asyncio.DatagramProtocol): # --- 导出接口(可选) --- def get_last_frames(): - return last_frames + return ctx.last_frames def get_camera_ws(): - return esp32_camera_ws + return ctx.esp32_camera_ws if __name__ == "__main__": import signal diff --git a/asr_core.py b/asr_core.py index b3cdcdd..b67a826 100644 --- a/asr_core.py +++ b/asr_core.py @@ -63,7 +63,8 @@ INTERRUPT_KEYWORDS = set( NAV_CONTROL_WHITELIST = [ "停止导航", "结束导航", "停止检测", "停止红绿灯", "开始导航", "盲道导航", "开始过马路", "过马路结束", - "帮我导航", "帮我过马路" + "帮我导航", "帮我过马路", + "室内导航", "室内导盲", # Day 25: 新增室内导航命令 ] diff --git a/audio_compressor.py b/audio_compressor.py index 9b60a4e..e52cfbd 100644 --- a/audio_compressor.py +++ b/audio_compressor.py @@ -371,9 +371,9 @@ class CompressedAudioCache: # 打印压缩率 compression_ratio = len(compressed) / self._original_sizes[filepath] - logger.info(f"[压缩] {os.path.basename(filepath)}: " - f"{self._original_sizes[filepath]} -> {len(compressed)} bytes " - f"({compression_ratio:.1%})") + # logger.info(f"[压缩] {os.path.basename(filepath)}: " + # f"{self._original_sizes[filepath]} -> {len(compressed)} bytes " + # f"({compression_ratio:.1%})") return compressed diff --git a/audio_player.py b/audio_player.py index da45a1a..8dddfa4 100644 --- a/audio_player.py +++ b/audio_player.py @@ -8,6 +8,7 @@ import asyncio import threading import queue import time +import hashlib from audio_stream import broadcast_pcm16_realtime from audio_compressor import compressed_audio_cache, AudioCompressor @@ -36,6 +37,9 @@ AUDIO_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "music VOICE_DIR = os.getenv("VOICE_DIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "voice")) VOICE_MAP_FILE = os.path.join(VOICE_DIR, "map.zh-CN.json") +# Day 26 优化: EdgeTTS 合成语音磁盘缓存目录 +TTS_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "voice", "tts_cache") + # 音频文件映射(将合并 voice 映射) AUDIO_MAP = { "检测到物体": os.path.join(AUDIO_BASE_DIR, "音频1.wav"), @@ -100,7 +104,7 @@ def load_wav_file(filepath): if framerate != 16000: import audioop frames, _ = audioop.ratecv(frames, sampwidth, 1, framerate, 16000, None) - print(f"[AUDIO] 重采样: {filepath} {framerate}Hz -> 16000Hz") + # print(f"[AUDIO] 重采样: {filepath} {framerate}Hz -> 16000Hz") _audio_cache[filepath] = frames return frames @@ -129,7 +133,8 @@ def _merge_voice_map(): added += 1 else: print(f"[AUDIO] 映射文件缺失: {fpath}") - print(f"[AUDIO] 已合并 voice 映射 {added} 条") + if added > 0: + print(f"[AUDIO] 已合并 voice 映射 {added} 条") except Exception as e: print(f"[AUDIO] 读取 voice 映射失败: {e}") @@ -250,13 +255,14 @@ def initialize_audio_system(): # 显示压缩统计 if os.getenv("AIGLASS_COMPRESS_AUDIO", "1") == "1": stats = compressed_audio_cache.get_compression_stats() - print(f"[AUDIO] 音频压缩统计:") - print(f" - 文件数: {stats['files_cached']}") - print(f" - 原始大小: {stats['total_original_size'] / 1024:.1f} KB") - print(f" - 压缩后: {stats['total_compressed_size'] / 1024:.1f} KB") - print(f" - 压缩率: {stats['compression_ratio']:.1%}") - print(f" - 节省: {stats['bytes_saved'] / 1024:.1f} KB") - + # print(f"[AUDIO] 音频压缩统计:") + # print(f" - 文件数: {stats['files_cached']}") + # print(f" - 原始大小: {stats['total_original_size'] / 1024:.1f} KB") + # print(f" - 压缩后: {stats['total_compressed_size'] / 1024:.1f} KB") + # print(f" - 压缩率: {stats['compression_ratio']:.1%}") + # print(f" - 节省: {stats['bytes_saved'] / 1024:.1f} KB") + pass + print("[AUDIO] 音频系统初始化完成(预加载+工作线程)") def play_audio_threadsafe(audio_key): @@ -385,8 +391,73 @@ def play_voice_text(text: str): _last_voice_time = current_time return - # 未匹配则输出日志(便于调试) - print(f"[AUDIO] 未找到匹配语音: {text}") + # 未匹配则尝试使用 EdgeTTS 进行流式合成 (Day 26) + print(f"[AUDIO] 未找到本地语音,尝试 EdgeTTS 合成: {text}") + + # 启动后台任务进行合成和播放 + # 注意:为了不阻塞主线程,这里使用 create_task + try: + loop = asyncio.get_event_loop() + loop.create_task(_synthesize_and_play_fallback(text)) + except RuntimeError: + # 如果当前线程没有 loop (例如在非 async 上下文中),则使用线程 + # 但通常 app_main 是 async 的,这里应该没问题 + pass + +async def _synthesize_and_play_fallback(text: str): + """(内部) 使用 EdgeTTS 合成并播放,支持磁盘缓存""" + try: + # 动态导入以避免循环依赖 + from edge_tts_client import text_to_speech_pcm + + global _audio_cache + cache_key = f"tts_fallback:{text}" + + # 1. 先检查内存缓存 + if cache_key in _audio_cache: + play_audio_threadsafe(cache_key) + return + + # 2. Day 26: 检查磁盘缓存 + text_hash = hashlib.md5(text.encode('utf-8')).hexdigest() + disk_cache_path = os.path.join(TTS_CACHE_DIR, f"{text_hash}.pcm") + + if os.path.exists(disk_cache_path): + # 从磁盘加载 + with open(disk_cache_path, 'rb') as f: + pcm_data = f.read() + if pcm_data: + _audio_cache[cache_key] = pcm_data + AUDIO_MAP[cache_key] = cache_key + play_audio_threadsafe(cache_key) + print(f"[AUDIO] EdgeTTS 从磁盘缓存加载: {text[:20]}...") + return + + # 3. 合成 (目标 16kHz PCM) + pcm_data = await text_to_speech_pcm(text, target_sample_rate=16000) + + if pcm_data: + # 存入内存缓存 + _audio_cache[cache_key] = pcm_data + AUDIO_MAP[cache_key] = cache_key + + # Day 26: 存入磁盘缓存(异步写入,不阻塞播放) + try: + os.makedirs(TTS_CACHE_DIR, exist_ok=True) + with open(disk_cache_path, 'wb') as f: + f.write(pcm_data) + print(f"[AUDIO] EdgeTTS 已缓存到磁盘: {text[:20]}...") + except Exception as disk_err: + print(f"[AUDIO] 磁盘缓存写入失败: {disk_err}") + + # 播放 + play_audio_threadsafe(cache_key) + print(f"[AUDIO] EdgeTTS 合成成功: {text}") + else: + print(f"[AUDIO] EdgeTTS 合成返回空: {text}") + + except Exception as e: + print(f"[AUDIO] EdgeTTS 回退失败: {e}") # 兼容旧接口 play_audio_on_esp32 = play_audio_threadsafe \ No newline at end of file diff --git a/edge_tts_client.py b/edge_tts_client.py index d3b870f..ce064a0 100644 --- a/edge_tts_client.py +++ b/edge_tts_client.py @@ -59,6 +59,7 @@ async def text_to_speech_stream( except Exception as e: print(f"[EdgeTTS] 合成失败: {e}") + raise e # Day 23: 抛出异常以便上层重试 async def text_to_speech( @@ -80,9 +81,28 @@ async def text_to_speech( MP3 音频数据 """ audio_chunks = [] - async for chunk in text_to_speech_stream(text, voice, rate, volume): - audio_chunks.append(chunk) - return b"".join(audio_chunks) + + # Day 23: 添加重试逻辑 + max_retries = 3 + for attempt in range(max_retries): + try: + audio_chunks = [] # 清空缓存,重新开始 + async for chunk in text_to_speech_stream(text, voice, rate, volume): + audio_chunks.append(chunk) + + # 成功,返回完整音频 + return b"".join(audio_chunks) + + except Exception: + if attempt < max_retries - 1: + wait_time = 0.5 * (2 ** attempt) + print(f"[EdgeTTS] 合成异常,{wait_time}s 后重试 ({attempt+1}/{max_retries})") + await asyncio.sleep(wait_time) + else: + print(f"[EdgeTTS] 重试 {max_retries} 次后仍失败") + return b"" # 最终失败返回空 + + return b"" async def text_to_speech_pcm( diff --git a/glm_client.py b/glm_client.py index abaad6c..408d5f3 100644 --- a/glm_client.py +++ b/glm_client.py @@ -13,10 +13,9 @@ from typing import AsyncGenerator, Optional from zai import ZhipuAiClient # API 配置 -API_KEY = os.getenv( - "GLM_API_KEY", - "5915240ea48d4e93b454bc2412d1cc54.e054ej4pPqi9G6rc" -) +API_KEY = os.getenv("GLM_API_KEY") +if not API_KEY: + raise RuntimeError("未设置 GLM_API_KEY 环境变量,请在 .env 中配置") MODEL = "glm-4.6v-flash" # 升级到 glm-4.6v-flash (支持视觉) # 星期映射 @@ -178,14 +177,35 @@ async def chat_stream(user_message: str, image_base64: Optional[str] = None) -> try: # 流式调用 # Day 22: 升级到 glm-4.6v-flash - # 【修正】根据官方文档,thinking 参数也是必须的 - response = await asyncio.to_thread( - client.chat.completions.create, - model=MODEL, - messages=messages, - thinking={"type": "disabled"}, - stream=True, - ) + max_retries = 3 + retry_delay = 1 + + response = None + for attempt in range(max_retries): + try: + # 【修正】根据官方文档,thinking 参数也是必须的 + response = await asyncio.to_thread( + client.chat.completions.create, + model=MODEL, + messages=messages, + thinking={"type": "disabled"}, + stream=True, + ) + break # 成功则跳出循环 + except Exception as e: + error_str = str(e) + if attempt < max_retries - 1: + if "429" in error_str or "1305" in error_str or "请求过多" in error_str: + print(f"[GLM] (流式) 速率限制,{retry_delay}秒后重试... ({attempt + 1}/{max_retries})") + await asyncio.sleep(retry_delay) + retry_delay *= 2 + continue + # 其他网络错误也可以重试 + print(f"[GLM] (流式) 连接错误: {e},重试... ({attempt + 1}/{max_retries})") + await asyncio.sleep(retry_delay) + continue + else: + raise e # 最后一次尝试失败,抛出异常 for chunk in response: if chunk.choices[0].delta.content: diff --git a/navigation_master.py b/navigation_master.py index 76737fb..71f6253 100644 --- a/navigation_master.py +++ b/navigation_master.py @@ -23,6 +23,7 @@ SEEKING_NEXT_BLINDPATH = "SEEKING_NEXT_BLINDPATH" # 过完马路后寻找下一 RECOVERY = "RECOVERY" # 兜底/恢复(感知暂时丢失时) TRAFFIC_LIGHT_DETECTION = "TRAFFIC_LIGHT_DETECTION" # 红绿灯检测模式 ITEM_SEARCH = "ITEM_SEARCH" # 找物品模式(暂停导航,由yolomedia处理画面) +INDOOR_NAV = "INDOOR_NAV" # 室内导航模式(使用室内导盲模型) # ========== 返回结构 ========== @dataclass @@ -247,9 +248,11 @@ class NavigationMaster: blind_nav: BlindPathNavigator, cross_nav: CrossStreetNavigator, *, + indoor_nav: BlindPathNavigator = None, # 新增:室内导航器 min_tts_interval: float = 1.2): self.blind = blind_nav self.cross = cross_nav + self.indoor = indoor_nav # 室内导航器(使用室内导盲模型) self.state = IDLE self.last_guidance_ts = 0.0 self.min_tts_interval = min_tts_interval @@ -302,7 +305,14 @@ class NavigationMaster: self.state = CHAT self.cooldown_until = time.time() + self.COOLDOWN_SEC if self.blind: - self.blind.reset() + try: self.blind.reset() + except: pass + if self.cross: + try: self.cross.reset() + except: pass + if self.indoor: + try: self.indoor.reset() + except: pass def start_crossing(self): """启动过马路模式""" @@ -316,6 +326,13 @@ class NavigationMaster: self.state = TRAFFIC_LIGHT_DETECTION self.cooldown_until = time.time() + self.COOLDOWN_SEC + def start_indoor_navigation(self): + """启动室内导航模式(使用室内导盲模型)""" + self.state = INDOOR_NAV + self.cooldown_until = time.time() + self.COOLDOWN_SEC + if self.blind: + self.blind.reset() + def is_in_navigation_mode(self): """检查是否在导航模式(非对话模式)""" return self.state not in ["CHAT", "IDLE", "TRAFFIC_LIGHT_DETECTION", "ITEM_SEARCH"] @@ -384,6 +401,10 @@ class NavigationMaster: self.cross.reset() except Exception: pass + try: + if self.indoor: self.indoor.reset() + except Exception: + pass # ----- 内部工具 ----- def _say(self, now: float, text: str) -> str: @@ -455,6 +476,25 @@ class NavigationMaster: # 冷却期内允许继续输出画面,但避免"瞬时切换" in_cooldown = now < self.cooldown_until + # 【新增】室内导航模式:使用室内导盲模型处理帧 + # Day 26: 支持 IndoorNavigator 返回的 IndoorResult + if self.state == INDOOR_NAV: + # 优先使用室内导航器,如果没有则 fallback 到盲道导航器 + nav = self.indoor if self.indoor else self.blind + try: + result = nav.process_frame(bgr) + except Exception as e: + self.state = RECOVERY + ann_err = bgr.copy() + return OrchestratorResult(ann_err, self._say(now, ""), self.state, {"error": str(e)}) + + ann = result.annotated_image if result.annotated_image is not None else bgr.copy() + say = result.guidance_text or "" + state_info = result.state_info if hasattr(result, 'state_info') else {} + + return OrchestratorResult(ann, self._say(now, say), self.state, + {"source": "indoor", "state_info": state_info}) + # 各状态处理 if self.state in (BLINDPATH_NAV, SEEKING_CROSSWALK, SEEKING_NEXT_BLINDPATH, RECOVERY): # —— 盲道侧 —— 统一调用盲道导航器 diff --git a/server_context.py b/server_context.py new file mode 100644 index 0000000..c0be974 --- /dev/null +++ b/server_context.py @@ -0,0 +1,98 @@ +# server_context.py +# -*- coding: utf-8 -*- +import asyncio +from typing import Dict, List, Set, Deque, Optional, Tuple, Any +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from fastapi import WebSocket + +class ServerContext: + """ + 单例模式的服务器全局上下文 + 用于统一管理状态、资源引用和客户端连接,解决 app_main.py 中 global 变量混乱的问题。 + """ + _instance = None + _lock = asyncio.Lock() # 异步锁,主要用于保护关键状态切换 + + def __new__(cls): + if cls._instance is None: + cls._instance = super(ServerContext, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._initialized = True + + # ====== 1. WebSocket 客户端管理 ====== + self.ui_clients: Dict[int, WebSocket] = {} + self.camera_viewers: Set[WebSocket] = set() + self.imu_ws_clients: Set[WebSocket] = set() + + self.esp32_audio_ws: Optional[WebSocket] = None + self.esp32_camera_ws: Optional[WebSocket] = None + + # ====== 2. 媒体数据缓冲 ====== + self.current_partial: str = "" + self.recent_finals: List[str] = [] + self.last_frames: Deque[Tuple[float, bytes]] = deque(maxlen=10) + + # ====== 3. 业务状态标志 (State Flags) ====== + # 盲道导航状态 + self.navigation_active: bool = False + # 过马路导航状态 + self.cross_street_active: bool = False + # Omni 对话状态 + self.omni_conversation_active: bool = False + self.omni_previous_nav_state: Optional[str] = None + + # YOLO 媒体流状态 + self.yolomedia_running: bool = False + self.yolomedia_sending_frames: bool = False + + # ====== 4. 核心组件引用 (Resources) ====== + # 导航器实例 + self.blind_path_navigator = None + self.cross_street_navigator = None + self.indoor_navigator = None + + # 协调器 + self.orchestrator = None + + # 模型实例 + self.yolo_seg_model = None + self.obstacle_detector = None + self.indoor_seg_model = None + + # ====== 5. 异步处理资源 ====== + # 帧处理线程池 + self.frame_processing_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="frame_proc") + + # 异步帧处理状态 + self.nav_processing_task: Optional[asyncio.Task] = None + self.nav_last_result_image: Any = None + self.nav_last_result_jpeg: Optional[bytes] = None + self.nav_pending_frame: Any = None + self.nav_processing_lock = asyncio.Lock() + self.nav_task_start_time: float = 0.0 + + def reset_navigation_state(self): + """重置所有导航相关的状态标志""" + self.navigation_active = False + self.cross_street_active = False + self.omni_conversation_active = False + # 注意:这里不停止 orchestrator,只是重置标志位 + + def add_ui_client(self, ws: WebSocket): + self.ui_clients[id(ws)] = ws + + def remove_ui_client(self, ws: WebSocket): + self.ui_clients.pop(id(ws), None) + + def get_ui_client_count(self) -> int: + return len(self.ui_clients) + +# 全局访问点 +ctx = ServerContext() diff --git a/trafficlight_detection.py b/trafficlight_detection.py index f2da50e..c019bdc 100644 --- a/trafficlight_detection.py +++ b/trafficlight_detection.py @@ -479,18 +479,25 @@ def is_detection_running(): return _detection_running def init_model(): - """初始化YOLO模型(单帧处理模式)""" + """初始化YOLO模型(单帧处理模式) + Day 26 优化: 包含预热推理,避免 TensorRT 重复加载 + """ global _model if _model is not None: - print("[TRAFFIC] 模型已加载") return True try: print("[TRAFFIC] 加载 YOLO 红绿灯检测模型...") - _model = YOLO(YOLO_MODEL_PATH) + _model = YOLO(YOLO_MODEL_PATH, task='detect') print(f"[TRAFFIC] 模型加载成功: {YOLO_MODEL_PATH}") class_names = _model.names if hasattr(_model, 'names') else {} print(f"[TRAFFIC] 模型类别: {class_names}") + + # Day 26 优化: 预热推理,创建 TensorRT 执行上下文(只创建一次) + test_img = np.zeros((640, 640, 3), dtype=np.uint8) + _ = _model(test_img, conf=CONF_THRESHOLD, verbose=False) + print("[TRAFFIC] 模型预热完成") + return True except Exception as e: print(f"[TRAFFIC] 模型加载失败: {e}") diff --git a/workflow_blindpath.py b/workflow_blindpath.py index ef4a98b..59c818c 100644 --- a/workflow_blindpath.py +++ b/workflow_blindpath.py @@ -88,14 +88,16 @@ class ProcessingResult: class BlindPathNavigator: """盲道导航处理器 - 无外部依赖版本""" - def __init__(self, yolo_model=None, obstacle_detector=None): + def __init__(self, yolo_model=None, obstacle_detector=None, enable_crosswalk_detection=True): """ 初始化导航器 :param yolo_model: YOLO分割模型(可选) :param obstacle_detector: 障碍物检测器(可选) + :param enable_crosswalk_detection: 是否启用斑马线检测(室内模式可关闭) """ self.yolo_model = yolo_model self.obstacle_detector = obstacle_detector + self.enable_crosswalk_detection = enable_crosswalk_detection # 状态变量 self.current_state = STATE_ONBOARDING @@ -184,6 +186,10 @@ class BlindPathNavigator: f"持续模式={self.straight_continuous_mode}, " f"限制次数={self.straight_repeat_limit}") logger.info(f"[BlindPath] 方向播报配置: 间隔={self.direction_interval}秒") + + # Day 26 优化: 可配置日志采样间隔 + self.log_interval = int(os.getenv("AIGLASS_LOG_INTERVAL", "30")) # 每 N 帧输出一次日志 + logger.info(f"[BlindPath] 日志采样间隔: 每{self.log_interval}帧") # 缓存变量 self.prev_gray = None @@ -258,8 +264,14 @@ class BlindPathNavigator: self.last_crosswalk_mask = None # 【新增】斑马线感知监控器 - self.crosswalk_monitor = CrosswalkAwarenessMonitor() - logger.info("[BlindPath] 斑马线感知监控器已初始化") + # 【新增】斑马线感知监控器 + if self.enable_crosswalk_detection: + self.crosswalk_monitor = CrosswalkAwarenessMonitor() + logger.info("[BlindPath] 斑马线感知监控器已初始化") + else: + self.crosswalk_monitor = None + logger.info("[BlindPath] 斑马线感知监控器已禁用 (室内模式)") + logger.info(f"[BlindPath] 盲道检测间隔: 每{self.BLINDPATH_DETECTION_INTERVAL}帧") def init_traffic_light_detector(self): @@ -489,16 +501,24 @@ class BlindPathNavigator: # 【新增】检查近距离障碍物并设置语音 self._check_and_set_obstacle_voice(detected_obstacles) + # 【配置】如果禁用了斑马线检测,强制置为None + if not self.enable_crosswalk_detection: + crosswalk_mask = None + # 【新增】斑马线感知处理 - # 【Day 15 优化】减少每帧日志输出,只在每 30 帧输出一次 - if crosswalk_mask is not None and self.frame_counter % 30 == 0: + # 【Day 26 优化】使用可配置的日志间隔 + if crosswalk_mask is not None and self.frame_counter % self.log_interval == 0: cross_pixels = np.sum(crosswalk_mask > 0) if cross_pixels > 0: logger.info(f"[斑马线] monitor: pixels={cross_pixels}, area={cross_pixels/crosswalk_mask.size*100:.2f}%") - elif crosswalk_mask is None and self.frame_counter % 30 == 0: + elif crosswalk_mask is None and self.frame_counter % self.log_interval == 0: + if self.enable_crosswalk_detection: logger.info(f"[斑马线] crosswalk_mask为None") - crosswalk_guidance = self.crosswalk_monitor.process_frame(crosswalk_mask, blind_path_mask) + crosswalk_guidance = None + if self.crosswalk_monitor: + crosswalk_guidance = self.crosswalk_monitor.process_frame(crosswalk_mask, blind_path_mask) + if crosswalk_guidance: logger.info(f"[斑马线感知] 检测结果: area={crosswalk_guidance.get('area', 0):.3f}, " f"should_broadcast={crosswalk_guidance.get('should_broadcast', False)}, " @@ -511,7 +531,7 @@ class BlindPathNavigator: logger.info(f"[斑马线语音] 已设置待播报语音: {crosswalk_guidance['voice_text']}, 优先级{crosswalk_guidance['priority']}") # 【新增】添加斑马线可视化 - if crosswalk_mask is not None: + if crosswalk_mask is not None and self.crosswalk_monitor: # 计算可视化数据 total_pixels = crosswalk_mask.size crosswalk_pixels = np.sum(crosswalk_mask > 0) diff --git a/workflow_crossstreet.py b/workflow_crossstreet.py index c9fb3c6..d3bc304 100644 --- a/workflow_crossstreet.py +++ b/workflow_crossstreet.py @@ -272,21 +272,22 @@ class CrossStreetNavigator: logger.info(f"[CROSS_STREET] 斑马线检测间隔: 每{self.CROSSWALK_DETECTION_INTERVAL}帧") # 确保模型在 GPU 上 - # Day 20: TensorRT 引擎不需要 .to() + # Day 20/26: TensorRT 引擎不需要 .to(),改用 model_utils 检查 if self.seg_model and torch.cuda.is_available(): try: # 检查是否是 TensorRT 引擎 + from model_utils import is_tensorrt_engine model_path = getattr(self.seg_model, 'ckpt_path', '') or '' - if not model_path.endswith('.engine'): - if hasattr(self.seg_model, 'model') and hasattr(self.seg_model.model, 'to'): - self.seg_model.model.to('cuda') - elif hasattr(self.seg_model, 'to'): - self.seg_model.to('cuda') + if is_tensorrt_engine(model_path): + pass # TensorRT 引擎无需 .to(),静默跳过 + elif hasattr(self.seg_model, 'model') and hasattr(self.seg_model.model, 'to'): + self.seg_model.model.to('cuda') logger.info("[CROSS_STREET] 模型已移至 GPU") - else: - logger.info("[CROSS_STREET] TensorRT 引擎已加载,跳过 .to()") - except Exception as e: - logger.warning(f"[CROSS_STREET] 无法将模型移至 GPU: {e}") + elif hasattr(self.seg_model, 'to'): + self.seg_model.to('cuda') + logger.info("[CROSS_STREET] 模型已移至 GPU") + except Exception: + pass # Day 26: 静默处理,避免启动日志刷屏 def reset(self): """重置状态""" diff --git a/workflow_indoor.py b/workflow_indoor.py new file mode 100644 index 0000000..c3f84fa --- /dev/null +++ b/workflow_indoor.py @@ -0,0 +1,454 @@ +# -*- coding: utf-8 -*- +""" +室内导航工作流 (Indoor Navigation Workflow) +Day 26: 专为室内导盲模型 (yolo11l-seg-indoor14) 设计 + +类别映射 (14 classes from MIT Indoor): +- 可行走区域: floor(0), corridor(1), sidewalk(2) +- 静态障碍物: chair(3), table(4), sofa_bed(5), cabinet(11), trash_can(12) +- 兴趣点: door(6), elevator(7), stairs(8) +- 边界: wall(9), window(13) +- 动态障碍: person(10) +""" + +import os +import time +import logging +import numpy as np +import cv2 +from dataclasses import dataclass +from typing import Optional, List, Dict, Any +from collections import deque + +logger = logging.getLogger(__name__) + +# ========== 类别常量 ========== +# 可行走区域 +WALKABLE_CLASSES = {0, 1, 2} # floor, corridor, sidewalk +CLASS_FLOOR = 0 +CLASS_CORRIDOR = 1 +CLASS_SIDEWALK = 2 + +# 静态障碍物 +OBSTACLE_CLASSES = {3, 4, 5, 11, 12} # chair, table, sofa_bed, cabinet, trash_can +CLASS_CHAIR = 3 +CLASS_TABLE = 4 +CLASS_SOFA_BED = 5 +CLASS_CABINET = 11 +CLASS_TRASH_CAN = 12 + +# 兴趣点 +POI_CLASSES = {6, 7, 8} # door, elevator, stairs +CLASS_DOOR = 6 +CLASS_ELEVATOR = 7 +CLASS_STAIRS = 8 + +# 边界 +BOUNDARY_CLASSES = {9, 13} # wall, window +CLASS_WALL = 9 +CLASS_WINDOW = 13 + +# 动态障碍 +CLASS_PERSON = 10 + +# 类别名称映射 +CLASS_NAMES = { + 0: 'floor', 1: 'corridor', 2: 'sidewalk', + 3: 'chair', 4: 'table', 5: 'sofa_bed', + 6: 'door', 7: 'elevator', 8: 'stairs', + 9: 'wall', 10: 'person', 11: 'cabinet', + 12: 'trash_can', 13: 'window' +} + +# 中文名称(用于语音) +CLASS_NAMES_CN = { + 0: '地面', 1: '走廊', 2: '人行道', + 3: '椅子', 4: '桌子', 5: '沙发', + 6: '门', 7: '电梯', 8: '楼梯', + 9: '墙壁', 10: '行人', 11: '柜子', + 12: '垃圾桶', 13: '窗户' +} + +# ========== 配置参数 ========== +CONF_THRESHOLD = float(os.getenv('INDOOR_CONF_THRESHOLD', '0.25')) +WALKABLE_MIN_AREA = int(os.getenv('INDOOR_WALKABLE_MIN_AREA', '3000')) +OBSTACLE_MIN_AREA = int(os.getenv('INDOOR_OBSTACLE_MIN_AREA', '500')) + +# 语音间隔 +GUIDE_INTERVAL = float(os.getenv('INDOOR_GUIDE_INTERVAL', '3.0')) +DIRECTION_INTERVAL = float(os.getenv('INDOOR_DIRECTION_INTERVAL', '2.5')) +POI_INTERVAL = float(os.getenv('INDOOR_POI_INTERVAL', '5.0')) +OBSTACLE_INTERVAL = float(os.getenv('INDOOR_OBSTACLE_INTERVAL', '2.0')) + +# ========== 可视化颜色 (BGR) ========== +VIS_COLORS = { + 'walkable': (0, 255, 0), # 绿色 - 可行走 + 'obstacle': (0, 0, 255), # 红色 - 障碍物 + 'poi': (255, 255, 0), # 青色 - 兴趣点 + 'boundary': (128, 128, 128), # 灰色 - 边界 + 'person': (255, 0, 255), # 粉色 - 行人 + 'centerline': (255, 255, 0), # 黄色 - 引导线 +} + + +@dataclass +class IndoorResult: + """室内导航结果""" + annotated_image: Optional[np.ndarray] = None + guidance_text: str = "" + state_info: Dict[str, Any] = None + visualizations: List[Dict[str, Any]] = None + + def __post_init__(self): + if self.state_info is None: + self.state_info = {} + if self.visualizations is None: + self.visualizations = [] + + +class IndoorNavigator: + """室内导航器 - 专为室内导盲模型设计""" + + def __init__(self, seg_model=None, device_id: str = "indoor"): + self.seg_model = seg_model + self.device_id = device_id + self.frame_counter = 0 + + # 语音节流 + self.last_guide_time = 0 + self.last_direction_time = 0 + self.last_poi_time = 0 + self.last_obstacle_time = 0 + self.last_guidance_text = "" + self.last_direction_text = "" + + # 检测间隔 + self.detection_interval = int(os.getenv('INDOOR_DETECTION_INTERVAL', '6')) + self.last_detection_frame = 0 + + # 缓存 + self.last_walkable_mask = None + self.last_obstacles = [] + self.last_pois = [] + + # 灰度图(用于光流等) + self.prev_gray = None + + # 日志间隔 + self.log_interval = int(os.getenv('AIGLASS_LOG_INTERVAL', '30')) + + logger.info(f"[INDOOR] 室内导航器初始化完成") + logger.info(f"[INDOOR] 检测间隔: 每{self.detection_interval}帧") + logger.info(f"[INDOOR] 可行走类别: {[CLASS_NAMES[c] for c in WALKABLE_CLASSES]}") + + def reset(self): + """重置状态""" + self.frame_counter = 0 + self.last_guide_time = 0 + self.last_direction_time = 0 + self.last_poi_time = 0 + self.last_obstacle_time = 0 + self.last_guidance_text = "" + self.last_direction_text = "" + self.last_walkable_mask = None + self.last_obstacles = [] + self.last_pois = [] + self.prev_gray = None + logger.info("[INDOOR] 导航器已重置") + + def process_frame(self, image: np.ndarray) -> IndoorResult: + """处理单帧图像""" + self.frame_counter += 1 + h, w = image.shape[:2] + now = time.time() + + frame_visualizations = [] + guidance_text = "" + state_info = {} + + # 是否执行检测 + should_detect = (self.frame_counter - self.last_detection_frame) >= self.detection_interval + + if should_detect and self.seg_model is not None: + self.last_detection_frame = self.frame_counter + + # 执行分割推理 + walkable_mask, obstacles, pois = self._detect_all(image) + + # 更新缓存 + self.last_walkable_mask = walkable_mask + self.last_obstacles = obstacles + self.last_pois = pois + else: + # 使用缓存 + walkable_mask = self.last_walkable_mask + obstacles = self.last_obstacles + pois = self.last_pois + + # 生成导航引导 + if walkable_mask is not None: + guidance_text = self._generate_guidance(walkable_mask, obstacles, pois, h, w, now) + + # 添加可视化 + self._add_mask_visualization(walkable_mask, frame_visualizations, + "walkable_mask", "rgba(0, 255, 0, 0.3)") + + # 障碍物可视化 + for obs in obstacles: + self._add_detection_visualization(obs, frame_visualizations, "obstacle") + + # 兴趣点可视化 + for poi in pois: + self._add_detection_visualization(poi, frame_visualizations, "poi") + + # 日志 + if self.frame_counter % self.log_interval == 0: + walkable_area = int(walkable_mask.sum()) if walkable_mask is not None else 0 + logger.info(f"[INDOOR] Frame={self.frame_counter} | 可行走面积={walkable_area} | " + f"障碍物={len(obstacles)} | 兴趣点={len(pois)}") + + # 更新状态信息 + state_info = { + 'frame': self.frame_counter, + 'walkable_detected': walkable_mask is not None and walkable_mask.sum() > 0, + 'obstacles_count': len(obstacles), + 'pois_count': len(pois), + } + + # 更新灰度图 + self.prev_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + + return IndoorResult( + annotated_image=image.copy(), + guidance_text=guidance_text, + state_info=state_info, + visualizations=frame_visualizations + ) + + def _detect_all(self, image: np.ndarray): + """执行分割检测,返回可行走区域、障碍物、兴趣点""" + h, w = image.shape[:2] + walkable_mask = np.zeros((h, w), dtype=np.uint8) + obstacles = [] + pois = [] + + try: + imgsz = int(os.getenv("AIGLASS_YOLO_IMGSZ", "480")) + use_half = os.getenv("AIGLASS_YOLO_HALF", "1") == "1" + + results = self.seg_model.predict( + image, + imgsz=imgsz, + conf=CONF_THRESHOLD, + verbose=False, + half=use_half + ) + + if results and len(results) > 0 and results[0].masks is not None: + r0 = results[0] + masks = r0.masks.data.cpu().numpy() + boxes = r0.boxes + + for i, (mask, cls_id, conf) in enumerate(zip(masks, boxes.cls, boxes.conf)): + cls_id = int(cls_id.item()) + conf_val = float(conf.item()) + + # 调整 mask 尺寸 + mask_resized = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST) + mask_bin = (mask_resized > 0.5).astype(np.uint8) + area = int(mask_bin.sum()) + + if area < 100: # 过滤小碎片 + continue + + # 可行走区域 + if cls_id in WALKABLE_CLASSES and area > WALKABLE_MIN_AREA: + walkable_mask = cv2.bitwise_or(walkable_mask, mask_bin * 255) + + # 障碍物 + elif cls_id in OBSTACLE_CLASSES or cls_id == CLASS_PERSON: + if area > OBSTACLE_MIN_AREA: + obstacles.append({ + 'class_id': cls_id, + 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), + 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), + 'conf': conf_val, + 'mask': mask_bin, + 'area': area, + 'center': self._mask_center(mask_bin), + }) + + # 兴趣点 + elif cls_id in POI_CLASSES: + pois.append({ + 'class_id': cls_id, + 'class_name': CLASS_NAMES.get(cls_id, 'unknown'), + 'class_name_cn': CLASS_NAMES_CN.get(cls_id, '未知'), + 'conf': conf_val, + 'mask': mask_bin, + 'area': area, + 'center': self._mask_center(mask_bin), + }) + + except Exception as e: + logger.warning(f"[INDOOR] 检测失败: {e}") + + return walkable_mask, obstacles, pois + + def _mask_center(self, mask: np.ndarray): + """计算 mask 质心""" + M = cv2.moments(mask) + if abs(M["m00"]) < 1e-6: + return None + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + return (cx, cy) + + def _generate_guidance(self, walkable_mask, obstacles, pois, h, w, now): + """生成导航引导文本""" + guidance_text = "" + + # 1. 计算可行走区域的偏移和方向 + direction_guidance = self._compute_direction_guidance(walkable_mask, h, w) + + # 2. 检查障碍物警告 + obstacle_warning = self._check_obstacle_warning(obstacles, walkable_mask, h, w) + + # 3. 检查兴趣点提示 + poi_hint = self._check_poi_hint(pois, h, w) + + # 优先级:障碍物 > 方向 > 兴趣点 + if obstacle_warning and (now - self.last_obstacle_time) > OBSTACLE_INTERVAL: + guidance_text = obstacle_warning + self.last_obstacle_time = now + self.last_guidance_text = guidance_text + elif direction_guidance: + # 方向引导节流 + if direction_guidance != self.last_direction_text: + if (now - self.last_direction_time) > DIRECTION_INTERVAL: + guidance_text = direction_guidance + self.last_direction_time = now + self.last_direction_text = direction_guidance + elif (now - self.last_guide_time) > GUIDE_INTERVAL: + # 同样的方向,降低频率 + guidance_text = direction_guidance + self.last_guide_time = now + elif poi_hint and (now - self.last_poi_time) > POI_INTERVAL: + guidance_text = poi_hint + self.last_poi_time = now + + return guidance_text + + def _compute_direction_guidance(self, walkable_mask, h, w): + """计算方向引导""" + if walkable_mask is None or walkable_mask.sum() < WALKABLE_MIN_AREA: + return "未检测到可行走区域" + + # 分析下半部分(更近的区域) + lower_half = walkable_mask[int(h * 0.5):, :] + + if lower_half.sum() < 1000: + return "前方可行走区域较小,请小心" + + # 计算左中右分布 + third = w // 3 + left_area = lower_half[:, :third].sum() + center_area = lower_half[:, third:2*third].sum() + right_area = lower_half[:, 2*third:].sum() + + total = left_area + center_area + right_area + 1e-6 + left_ratio = left_area / total + center_ratio = center_area / total + right_ratio = right_area / total + + # 方向判断 + if center_ratio > 0.4: + return "保持直行" + elif left_ratio > right_ratio * 1.5: + return "向左调整" + elif right_ratio > left_ratio * 1.5: + return "向右调整" + else: + return "保持直行" + + def _check_obstacle_warning(self, obstacles, walkable_mask, h, w): + """检查是否有障碍物在前方""" + if not obstacles: + return None + + # 定义前方区域(画面中下部) + front_zone_top = int(h * 0.4) + front_zone_left = int(w * 0.2) + front_zone_right = int(w * 0.8) + + for obs in obstacles: + center = obs.get('center') + if center is None: + continue + cx, cy = center + + # 检查是否在前方区域 + if front_zone_top < cy < h and front_zone_left < cx < front_zone_right: + name_cn = obs.get('class_name_cn', '障碍物') + + # 判断位置 + if cx < w * 0.4: + return f"左前方有{name_cn}" + elif cx > w * 0.6: + return f"右前方有{name_cn}" + else: + return f"正前方有{name_cn}" + + return None + + def _check_poi_hint(self, pois, h, w): + """检查兴趣点提示""" + if not pois: + return None + + for poi in pois: + cls_id = poi.get('class_id') + name_cn = poi.get('class_name_cn', '兴趣点') + center = poi.get('center') + + if center is None: + continue + cx, cy = center + + # 楼梯需要特别警告 + if cls_id == CLASS_STAIRS: + if cy > h * 0.5: # 比较近 + return f"注意前方有{name_cn}" + + # 门/电梯提示 + elif cls_id in (CLASS_DOOR, CLASS_ELEVATOR): + if cy > h * 0.3: # 在视野内 + position = "左侧" if cx < w * 0.4 else ("右侧" if cx > w * 0.6 else "前方") + return f"{position}有{name_cn}" + + return None + + def _add_mask_visualization(self, mask, visualizations, viz_type, color): + """添加 mask 可视化""" + if mask is None or mask.sum() == 0: + return + + visualizations.append({ + 'type': viz_type, + 'mask': mask, + 'color': color + }) + + def _add_detection_visualization(self, detection, visualizations, det_type): + """添加检测框可视化""" + center = detection.get('center') + if center is None: + return + + visualizations.append({ + 'type': det_type, + 'center': center, + 'class_name': detection.get('class_name', 'unknown'), + 'class_name_cn': detection.get('class_name_cn', '未知'), + 'conf': detection.get('conf', 0), + }) diff --git a/yolomedia.py b/yolomedia.py index a49854f..ff1dc37 100644 --- a/yolomedia.py +++ b/yolomedia.py @@ -24,6 +24,10 @@ from mediapipe.framework.formats import landmark_pb2 from ultralytics import YOLO from ultralytics.utils.plotting import Colors import bridge_io + +# Day 26: 抑制 pygame 社区欢迎信息 +import os +os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "1" import pygame # 用于播放本地音频文件 from audio_player import play_audio_threadsafe