Files
NaviGlassServer/crosswalk_awareness.py
2025-12-31 15:42:30 +08:00

343 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
斑马线感知监控器
基于面积变化的斑马线检测和语音提示
不涉及状态切换,只提供语音引导
"""
import time
import numpy as np
from collections import deque
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
class CrosswalkAwarenessMonitor:
"""斑马线感知监控器 - 纯语音提示模块"""
def __init__(self):
# 面积阈值(固定锚点)
self.THRESHOLDS = {
'discover': 0.01, # 1% - 发现
'approaching': 0.08, # 8% - 靠近
'near': 0.18, # 18% - 很近
'arrival': 0.25, # 25% - 到达(可以过马路)
}
# 已播报的阈值(避免重复)
self.broadcasted_thresholds = set()
# 面积历史记录
self.area_history = deque(maxlen=30) # 保存最近30帧
# 时间记录
self.last_broadcast_time = 0
self.arrival_first_broadcast_time = 0
# 状态标志
self.in_arrival_state = False # 是否在"可以过马路"状态
self.last_position_zone = None # 上次播报的方位
# 播报间隔配置(可调整参数 - 数值越小播报越频繁)
# 【参数调整】将所有间隔除以1.5提高播报频率1.5倍
self.REPEAT_INTERVALS = {
'approaching': 6.7, # 靠近阶段每6.7秒重复原10秒÷1.5
'near': 3.3, # 很近阶段每3.3秒重复原5秒÷1.5
'arrival': 5.3, # 到达阶段每5.3秒重复原8秒÷1.5
}
# 提示:如需调整频率,修改这些数值即可
# - 数值越小 = 播报越频繁
# - 数值越大 = 播报越稀疏
# 无遮挡判断阈值
self.OCCLUSION_THRESHOLD = 0.30 # 重叠>30%认为有遮挡
def process_frame(self, crosswalk_mask, blind_path_mask=None) -> Optional[Dict[str, Any]]:
"""
处理每帧的斑马线检测
返回:
{
'voice_text': 语音文本,
'priority': 优先级,
'should_broadcast': 是否应该播报,
'area': 当前面积,
'position': 方位描述,
'visualization': 可视化信息(用于外部绘制)
}
或 None无需播报
"""
# 如果没有斑马线,重置状态
if crosswalk_mask is None:
self._reset_if_needed()
return None
# 1. 计算面积
total_pixels = crosswalk_mask.size
crosswalk_pixels = np.sum(crosswalk_mask > 0)
area_ratio = crosswalk_pixels / total_pixels
# 2. 计算中心位置
y_coords, x_coords = np.where(crosswalk_mask > 0)
if len(y_coords) == 0:
return None
center_x_ratio = np.mean(x_coords) / crosswalk_mask.shape[1]
center_y_ratio = np.mean(y_coords) / crosswalk_mask.shape[0]
# 3. 记录历史
current_time = time.time()
self.area_history.append({
'area': area_ratio,
'center_x': center_x_ratio,
'center_y': center_y_ratio,
'time': current_time
})
# 4. 检查遮挡
has_occlusion = self._check_occlusion(crosswalk_mask, blind_path_mask)
# 5. 判断当前阶段和生成语音
return self._generate_guidance(area_ratio, center_x_ratio, center_y_ratio,
has_occlusion, current_time)
def _check_occlusion(self, crosswalk_mask, blind_path_mask) -> bool:
"""检查斑马线是否被盲道遮挡"""
if blind_path_mask is None:
return False
crosswalk_area = crosswalk_mask > 0
blind_path_area = blind_path_mask > 0
# 计算重叠
overlap = np.logical_and(crosswalk_area, blind_path_area)
overlap_ratio = np.sum(overlap) / max(np.sum(crosswalk_area), 1)
# 重叠超过阈值认为有遮挡
return overlap_ratio > self.OCCLUSION_THRESHOLD
def _get_position_description(self, center_x_ratio) -> str:
"""获取方位描述3分法"""
if center_x_ratio < 0.40:
return "在画面左侧"
elif center_x_ratio < 0.60:
return "在画面中间"
else:
return "在画面右侧"
def _generate_guidance(self, area_ratio, center_x_ratio, center_y_ratio,
has_occlusion, current_time) -> Optional[Dict[str, Any]]:
"""生成引导语音"""
# 检查面积是否稳定(避免抖动)
if not self._is_area_stable(area_ratio):
return None
position_desc = self._get_position_description(center_x_ratio)
# 阶段1发现阶段0.01-0.08
if area_ratio >= self.THRESHOLDS['discover'] and area_ratio < self.THRESHOLDS['approaching']:
if self.THRESHOLDS['discover'] not in self.broadcasted_thresholds:
self.broadcasted_thresholds.add(self.THRESHOLDS['discover'])
return {
'voice_text': f"远处发现斑马线,{position_desc}",
'priority': 55, # 提高到55超过盲道方向指令(50)
'should_broadcast': True,
'area': area_ratio,
'position': position_desc
}
# 阶段2靠近阶段0.08-0.18
elif area_ratio >= self.THRESHOLDS['approaching'] and area_ratio < self.THRESHOLDS['near']:
# 首次播报
if self.THRESHOLDS['approaching'] not in self.broadcasted_thresholds:
self.broadcasted_thresholds.add(self.THRESHOLDS['approaching'])
self.last_broadcast_time = current_time
self.last_position_zone = position_desc
return {
'voice_text': f"正在靠近斑马线,{position_desc}",
'priority': 55, # 提高到55
'should_broadcast': True,
'area': area_ratio,
'position': position_desc
}
# 重复播报每10秒或方位变化
elif (current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['approaching'] or
position_desc != self.last_position_zone):
self.last_broadcast_time = current_time
self.last_position_zone = position_desc
return {
'voice_text': f"正在靠近斑马线,{position_desc}",
'priority': 55, # 提高到55
'should_broadcast': True,
'area': area_ratio,
'position': position_desc
}
# 阶段3很近阶段0.18-0.25
elif area_ratio >= self.THRESHOLDS['near'] and area_ratio < self.THRESHOLDS['arrival']:
# 首次播报
if self.THRESHOLDS['near'] not in self.broadcasted_thresholds:
self.broadcasted_thresholds.add(self.THRESHOLDS['near'])
self.last_broadcast_time = current_time
self.last_position_zone = position_desc
return {
'voice_text': f"接近斑马线,{position_desc}",
'priority': 60,
'should_broadcast': True,
'area': area_ratio,
'position': position_desc
}
# 重复播报每5秒或方位变化
elif (current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['near'] or
position_desc != self.last_position_zone):
self.last_broadcast_time = current_time
self.last_position_zone = position_desc
return {
'voice_text': f"接近斑马线,{position_desc}",
'priority': 60,
'should_broadcast': True,
'area': area_ratio,
'position': position_desc
}
# 阶段4到达阶段area ≥ 0.25,无遮挡)
elif area_ratio >= self.THRESHOLDS['arrival']:
# 必须无遮挡才能提示过马路
if has_occlusion:
# 有遮挡暂不提示过马路停留在阶段3
logger.info(f"[斑马线] 面积达到{area_ratio:.2f}但被遮挡,暂不提示过马路")
return None
# 首次到达
if not self.in_arrival_state:
self.in_arrival_state = True
self.arrival_first_broadcast_time = current_time
self.last_broadcast_time = current_time
logger.info(f"[斑马线] 到达状态area={area_ratio:.2f}, 无遮挡")
return {
'voice_text': "斑马线到了可以过马路",
'priority': 80,
'should_broadcast': True,
'area': area_ratio,
'position': '到达'
}
# 重复播报每8秒
elif current_time - self.last_broadcast_time >= self.REPEAT_INTERVALS['arrival']:
self.last_broadcast_time = current_time
return {
'voice_text': "斑马线到了可以过马路",
'priority': 80,
'should_broadcast': True,
'area': area_ratio,
'position': '到达'
}
# 超时处理30秒后自动退出到达状态
elif current_time - self.arrival_first_broadcast_time > 30.0:
logger.info("[斑马线] 到达状态超时30秒自动退出")
self.in_arrival_state = False
return None
# 降级处理:如果从到达状态面积减小
if self.in_arrival_state and area_ratio < 0.20:
logger.info(f"[斑马线] 面积降至{area_ratio:.2f},退出到达状态")
self.in_arrival_state = False
# 清除部分已播报标记,允许重新播报
self.broadcasted_thresholds.discard(self.THRESHOLDS['arrival'])
return None
def _is_area_stable(self, area_ratio, stability_frames=5) -> bool:
"""检查面积是否稳定(避免抖动触发)"""
if len(self.area_history) < stability_frames:
return True # 初始阶段,认为稳定
recent_areas = [h['area'] for h in list(self.area_history)[-stability_frames:]]
# 检查最近N帧是否都在当前面积附近±20%
for recent_area in recent_areas:
if abs(recent_area - area_ratio) / max(area_ratio, 0.001) > 0.20:
return False
return True
def _reset_if_needed(self):
"""重置状态(斑马线消失时)"""
if len(self.area_history) > 0:
logger.info("[斑马线] 斑马线消失,重置状态")
self.broadcasted_thresholds.clear()
self.area_history.clear()
self.in_arrival_state = False
self.last_position_zone = None
def reset(self):
"""完全重置"""
self.broadcasted_thresholds.clear()
self.area_history.clear()
self.in_arrival_state = False
self.last_broadcast_time = 0
self.arrival_first_broadcast_time = 0
self.last_position_zone = None
logger.info("[斑马线] 感知监控器已重置")
def is_in_arrival_state(self) -> bool:
"""是否在到达状态(用于外部判断是否暂停盲道语音)"""
return self.in_arrival_state
def get_current_area(self) -> float:
"""获取当前面积"""
if len(self.area_history) > 0:
return self.area_history[-1]['area']
return 0.0
def get_visualization_data(self, crosswalk_mask, area_ratio, center_x_ratio, center_y_ratio, has_occlusion) -> Dict[str, Any]:
"""
获取可视化数据
返回包含所有可视化元素的字典
"""
if crosswalk_mask is None:
return {}
# 确定当前阶段(统一使用橙色)
if area_ratio >= self.THRESHOLDS['arrival']:
stage = "到达"
stage_color = "rgba(255, 165, 0, 0.5)" # 橙色
elif area_ratio >= self.THRESHOLDS['near']:
stage = "接近"
stage_color = "rgba(255, 165, 0, 0.45)" # 橙色
elif area_ratio >= self.THRESHOLDS['approaching']:
stage = "靠近"
stage_color = "rgba(255, 165, 0, 0.40)" # 橙色
else:
stage = "发现"
stage_color = "rgba(255, 165, 0, 0.35)" # 橙色
# 方位描述
position = self._get_position_description(center_x_ratio)
return {
'area_ratio': area_ratio,
'stage': stage,
'stage_color': stage_color,
'position': position.replace("在画面", ""), # 去掉"在画面"前缀
'center_x_ratio': center_x_ratio,
'center_y_ratio': center_y_ratio,
'has_occlusion': has_occlusion,
'in_arrival': self.in_arrival_state
}
# 辅助函数
def split_combined_voice(combined_text: str) -> list:
"""
将组合语音拆分为多个独立语音
例如:"远处发现斑马线,在画面左侧" → ["远处发现斑马线", "在画面左侧"]
"""
if ',' in combined_text:
parts = combined_text.split(',')
return [p.strip() for p in parts if p.strip()]
return [combined_text]