# -*- coding: utf-8 -*- """ 斑马线感知监控器 基于面积变化的斑马线检测和语音提示 不涉及状态切换,只提供语音引导 """ import time import numpy as np from collections import deque from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) class CrosswalkAwarenessMonitor: """斑马线感知监控器 - 纯语音提示模块""" def __init__(self): # 面积阈值(固定锚点) self.THRESHOLDS = { 'discover': 0.01, # 1% - 发现 'approaching': 0.08, # 8% - 靠近 'near': 0.18, # 18% - 很近 'arrival': 0.25, # 25% - 到达(可以过马路) } # 已播报的阈值(避免重复) self.broadcasted_thresholds = set() # 面积历史记录 self.area_history = deque(maxlen=30) # 保存最近30帧 # 时间记录 self.last_broadcast_time = 0 self.arrival_first_broadcast_time = 0 # 状态标志 self.in_arrival_state = False # 是否在"可以过马路"状态 self.last_position_zone = None # 上次播报的方位 # 播报间隔配置(可调整参数 - 数值越小播报越频繁) # 【参数调整】将所有间隔除以1.5,提高播报频率1.5倍 self.REPEAT_INTERVALS = { 'approaching': 6.7, # 靠近阶段:每6.7秒重复(原10秒÷1.5) 'near': 3.3, # 很近阶段:每3.3秒重复(原5秒÷1.5) 'arrival': 5.3, # 到达阶段:每5.3秒重复(原8秒÷1.5) } # 提示:如需调整频率,修改这些数值即可 # - 数值越小 = 播报越频繁 # - 数值越大 = 播报越稀疏 # 无遮挡判断阈值 self.OCCLUSION_THRESHOLD = 0.30 # 重叠>30%认为有遮挡 def process_frame(self, crosswalk_mask, blind_path_mask=None) -> Optional[Dict[str, Any]]: """ 处理每帧的斑马线检测 返回: { 'voice_text': 语音文本, 'priority': 优先级, 'should_broadcast': 是否应该播报, 'area': 当前面积, 'position': 方位描述, 'visualization': 可视化信息(用于外部绘制) } 或 None(无需播报) """ # 如果没有斑马线,重置状态 if crosswalk_mask is None: self._reset_if_needed() return None # 1. 计算面积 total_pixels = crosswalk_mask.size crosswalk_pixels = np.sum(crosswalk_mask > 0) area_ratio = crosswalk_pixels / total_pixels # 2. 计算中心位置 y_coords, x_coords = np.where(crosswalk_mask > 0) if len(y_coords) == 0: return None center_x_ratio = np.mean(x_coords) / crosswalk_mask.shape[1] center_y_ratio = np.mean(y_coords) / crosswalk_mask.shape[0] # 3. 记录历史 current_time = time.time() self.area_history.append({ 'area': area_ratio, 'center_x': center_x_ratio, 'center_y': center_y_ratio, 'time': current_time }) # 4. 检查遮挡 has_occlusion = self._check_occlusion(crosswalk_mask, blind_path_mask) # 5. 判断当前阶段和生成语音 return self._generate_guidance(area_ratio, center_x_ratio, center_y_ratio, has_occlusion, current_time) def _check_occlusion(self, crosswalk_mask, blind_path_mask) -> bool: """检查斑马线是否被盲道遮挡""" if blind_path_mask is None: return False crosswalk_area = crosswalk_mask > 0 blind_path_area = blind_path_mask > 0 # 计算重叠 overlap = np.logical_and(crosswalk_area, blind_path_area) overlap_ratio = np.sum(overlap) / max(np.sum(crosswalk_area), 1) # 重叠超过阈值认为有遮挡 return overlap_ratio > self.OCCLUSION_THRESHOLD def _get_position_description(self, center_x_ratio) -> str: """获取方位描述(3分法)""" if center_x_ratio < 0.40: return "在画面左侧" elif center_x_ratio < 0.60: return "在画面中间" else: return "在画面右侧" def _generate_guidance(self, area_ratio, center_x_ratio, center_y_ratio, has_occlusion, current_time) -> Optional[Dict[str, Any]]: """生成引导语音""" # 检查面积是否稳定(避免抖动) if not self._is_area_stable(area_ratio): return None position_desc = self._get_position_description(center_x_ratio) # 阶段1:发现阶段(0.01-0.08) if area_ratio >= self.THRESHOLDS['discover'] and area_ratio < self.THRESHOLDS['approaching']: if self.THRESHOLDS['discover'] not in self.broadcasted_thresholds: self.broadcasted_thresholds.add(self.THRESHOLDS['discover']) return { 'voice_text': f"远处发现斑马线,{position_desc}", 'priority': 55, # 提高到55,超过盲道方向指令(50) 'should_broadcast': True, 'area': area_ratio, 'position': position_desc } # 阶段2:靠近阶段(0.08-0.18) elif area_ratio >= self.THRESHOLDS['approaching'] and area_ratio < self.THRESHOLDS['near']: # 首次播报 if self.THRESHOLDS['approaching'] not in self.broadcasted_thresholds: self.broadcasted_thresholds.add(self.THRESHOLDS['approaching']) self.last_broadcast_time = current_time self.last_position_zone = position_desc return { 'voice_text': f"正在靠近斑马线,{position_desc}", 'priority': 55, # 提高到55 'should_broadcast': True, 'area': area_ratio, 'position': position_desc } # 重复播报(每10秒或方位变化) elif (current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['approaching'] or position_desc != self.last_position_zone): self.last_broadcast_time = current_time self.last_position_zone = position_desc return { 'voice_text': f"正在靠近斑马线,{position_desc}", 'priority': 55, # 提高到55 'should_broadcast': True, 'area': area_ratio, 'position': position_desc } # 阶段3:很近阶段(0.18-0.25) elif area_ratio >= self.THRESHOLDS['near'] and area_ratio < self.THRESHOLDS['arrival']: # 首次播报 if self.THRESHOLDS['near'] not in self.broadcasted_thresholds: self.broadcasted_thresholds.add(self.THRESHOLDS['near']) self.last_broadcast_time = current_time self.last_position_zone = position_desc return { 'voice_text': f"接近斑马线,{position_desc}", 'priority': 60, 'should_broadcast': True, 'area': area_ratio, 'position': position_desc } # 重复播报(每5秒或方位变化) elif (current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['near'] or position_desc != self.last_position_zone): self.last_broadcast_time = current_time self.last_position_zone = position_desc return { 'voice_text': f"接近斑马线,{position_desc}", 'priority': 60, 'should_broadcast': True, 'area': area_ratio, 'position': position_desc } # 阶段4:到达阶段(area ≥ 0.25,无遮挡) elif area_ratio >= self.THRESHOLDS['arrival']: # 必须无遮挡才能提示过马路 if has_occlusion: # 有遮挡,暂不提示过马路,停留在阶段3 logger.info(f"[斑马线] 面积达到{area_ratio:.2f}但被遮挡,暂不提示过马路") return None # 首次到达 if not self.in_arrival_state: self.in_arrival_state = True self.arrival_first_broadcast_time = current_time self.last_broadcast_time = current_time logger.info(f"[斑马线] 到达状态:area={area_ratio:.2f}, 无遮挡") return { 'voice_text': "斑马线到了可以过马路", 'priority': 80, 'should_broadcast': True, 'area': area_ratio, 'position': '到达' } # 重复播报(每8秒) elif current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['arrival']: self.last_broadcast_time = current_time return { 'voice_text': "斑马线到了可以过马路", 'priority': 80, 'should_broadcast': True, 'area': area_ratio, 'position': '到达' } # 超时处理(30秒后自动退出到达状态) elif current_time - self.arrival_first_broadcast_time > 30.0: logger.info("[斑马线] 到达状态超时30秒,自动退出") self.in_arrival_state = False return None # 降级处理:如果从到达状态面积减小 if self.in_arrival_state and area_ratio < 0.20: logger.info(f"[斑马线] 面积降至{area_ratio:.2f},退出到达状态") self.in_arrival_state = False # 清除部分已播报标记,允许重新播报 self.broadcasted_thresholds.discard(self.THRESHOLDS['arrival']) return None def _is_area_stable(self, area_ratio, stability_frames=5) -> bool: """检查面积是否稳定(避免抖动触发)""" if len(self.area_history) < stability_frames: return True # 初始阶段,认为稳定 recent_areas = [h['area'] for h in list(self.area_history)[-stability_frames:]] # 检查最近N帧是否都在当前面积附近(±20%) for recent_area in recent_areas: if abs(recent_area - area_ratio) / max(area_ratio, 0.001) > 0.20: return False return True def _reset_if_needed(self): """重置状态(斑马线消失时)""" if len(self.area_history) > 0: logger.info("[斑马线] 斑马线消失,重置状态") self.broadcasted_thresholds.clear() self.area_history.clear() self.in_arrival_state = False self.last_position_zone = None def reset(self): """完全重置""" self.broadcasted_thresholds.clear() self.area_history.clear() self.in_arrival_state = False self.last_broadcast_time = 0 self.arrival_first_broadcast_time = 0 self.last_position_zone = None logger.info("[斑马线] 感知监控器已重置") def is_in_arrival_state(self) -> bool: """是否在到达状态(用于外部判断是否暂停盲道语音)""" return self.in_arrival_state def get_current_area(self) -> float: """获取当前面积""" if len(self.area_history) > 0: return self.area_history[-1]['area'] return 0.0 def get_visualization_data(self, crosswalk_mask, area_ratio, center_x_ratio, center_y_ratio, has_occlusion) -> Dict[str, Any]: """ 获取可视化数据 返回包含所有可视化元素的字典 """ if crosswalk_mask is None: return {} # 确定当前阶段(统一使用橙色) if area_ratio >= self.THRESHOLDS['arrival']: stage = "到达" stage_color = "rgba(255, 165, 0, 0.5)" # 橙色 elif area_ratio >= self.THRESHOLDS['near']: stage = "接近" stage_color = "rgba(255, 165, 0, 0.45)" # 橙色 elif area_ratio >= self.THRESHOLDS['approaching']: stage = "靠近" stage_color = "rgba(255, 165, 0, 0.40)" # 橙色 else: stage = "发现" stage_color = "rgba(255, 165, 0, 0.35)" # 橙色 # 方位描述 position = self._get_position_description(center_x_ratio) return { 'area_ratio': area_ratio, 'stage': stage, 'stage_color': stage_color, 'position': position.replace("在画面", ""), # 去掉"在画面"前缀 'center_x_ratio': center_x_ratio, 'center_y_ratio': center_y_ratio, 'has_occlusion': has_occlusion, 'in_arrival': self.in_arrival_state } # 辅助函数 def split_combined_voice(combined_text: str) -> list: """ 将组合语音拆分为多个独立语音 例如:"远处发现斑马线,在画面左侧" → ["远处发现斑马线", "在画面左侧"] """ if ',' in combined_text: parts = combined_text.split(',') return [p.strip() for p in parts if p.strip()] return [combined_text]