Files
NaviGlassServer/utils.py
2025-12-31 15:42:30 +08:00

308 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# utils.py
# -*- coding: utf-8 -*-
import cv2
import numpy as np
import logging
logger = logging.getLogger(__name__)
# 物品名称映射
ITEM_TO_CLASS_MAP = {
"红牛": "Red_Bull",
"AD钙奶": "AD_milk",
"ad钙奶": "AD_milk",
"钙奶": "AD_milk",
}
# 英文类别名到中文的映射
_OBSTACLE_NAME_CN = {
'person': '',
'bicycle': '自行车',
'car': '',
'motorcycle': '摩托车',
'bus': '公交车',
'truck': '卡车',
'animal': '动物',
'scooter': '电瓶车',
'stroller': '婴儿车',
'dog': '',
}
# 动态类别名称列表
DYNAMIC_CLASS_NAMES = {'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck', 'animal', 'dog'}
def extract_english_label(item_cn: str) -> tuple:
"""
提取中文物品名称对应的英文标签
:param item_cn: 中文物品名称
:return: (英文标签, 来源)
"""
# 先查找本地映射
if item_cn in ITEM_TO_CLASS_MAP:
return ITEM_TO_CLASS_MAP[item_cn], "local"
# 如果没有找到,返回原始名称
return item_cn, "direct"
def _to_cn_obstacle(name: str) -> str:
"""
将英文障碍物名称转换为中文
:param name: 英文名称
:return: 中文名称
"""
try:
key = (name or '').strip().lower()
return _OBSTACLE_NAME_CN.get(key, '障碍物')
except Exception:
return '障碍物'
def estimate_global_affine(prev_gray, curr_gray, mask=None):
"""
估计两帧之间的全局仿射变换
:param prev_gray: 前一帧灰度图
:param curr_gray: 当前帧灰度图
:param mask: 可选的掩码,只在掩码区域内计算
:return: (仿射矩阵, 内点数)
"""
try:
# 提取特征点
detector = cv2.ORB_create(nfeatures=500)
kp1, des1 = detector.detectAndCompute(prev_gray, mask)
kp2, des2 = detector.detectAndCompute(curr_gray, mask)
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32), 0
# 匹配特征点
matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = matcher.match(des1, des2)
if len(matches) < 4:
return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32), 0
# 提取匹配的点对
src_pts = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
# 使用RANSAC估计仿射变换
M, inliers = cv2.estimateAffinePartial2D(src_pts, dst_pts, method=cv2.RANSAC,
ransacReprojThreshold=3.0)
if M is None:
return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32), 0
inlier_count = np.sum(inliers) if inliers is not None else 0
return M, inlier_count
except Exception as e:
logger.warning(f"estimate_global_affine failed: {e}")
return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32), 0
def warp_mask(mask, M, output_shape):
"""
使用仿射变换对掩码进行变换
:param mask: 输入掩码
:param M: 2x3的仿射变换矩阵
:param output_shape: 输出形状 (width, height)
:return: 变换后的掩码
"""
try:
if mask is None or M is None:
return None
W, H = output_shape
warped = cv2.warpAffine(mask, M, (W, H),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0)
return warped
except Exception as e:
logger.warning(f"warp_mask failed: {e}")
return None
def estimate_translation_flow(prev_gray, curr_gray, mask=None):
"""
估计两帧之间的平移光流
:param prev_gray: 前一帧灰度图
:param curr_gray: 当前帧灰度图
:param mask: 可选的掩码
:return: (中位光流幅度, 平移矩阵)
"""
try:
# 计算稀疏光流
corners = cv2.goodFeaturesToTrack(prev_gray, maxCorners=100,
qualityLevel=0.3, minDistance=7,
mask=mask)
if corners is None or len(corners) < 10:
return 0.0, np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32)
# 计算光流
next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray,
corners, None)
# 筛选有效点
valid_old = corners[status == 1]
valid_new = next_pts[status == 1]
if len(valid_old) < 5:
return 0.0, np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32)
# 计算位移
flow_vectors = valid_new - valid_old
flow_magnitudes = np.linalg.norm(flow_vectors, axis=1)
median_flow = np.median(flow_magnitudes)
# 估计平均平移
mean_translation = np.mean(flow_vectors, axis=0)
M = np.array([[1, 0, mean_translation[0]],
[0, 1, mean_translation[1]]], dtype=np.float32)
return median_flow, M
except Exception as e:
logger.warning(f"estimate_translation_flow failed: {e}")
return 0.0, np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32)
def is_stationary_frame(prev_gray, curr_gray, mask=None, threshold=0.35):
"""
判断用户是否静止
:param prev_gray: 前一帧灰度图
:param curr_gray: 当前帧灰度图
:param mask: 可选的掩码
:param threshold: 静止判定阈值
:return: True表示静止False表示运动
"""
try:
median_flow, _ = estimate_translation_flow(prev_gray, curr_gray, mask)
return median_flow < threshold
except:
return False
def compute_approach_metrics(prev_obstacles, curr_obstacles, M, H, W):
"""
计算障碍物的接近度量
:param prev_obstacles: 前一帧障碍物列表
:param curr_obstacles: 当前帧障碍物列表
:param M: 仿射变换矩阵
:param H: 图像高度
:param W: 图像宽度
:return: 接近度量列表
"""
metrics = []
for curr_obs in curr_obstacles:
# 寻找最佳匹配的前一帧障碍物
best_match = None
best_iou = 0.0
curr_mask = curr_obs.get('mask')
if curr_mask is None:
metrics.append(None)
continue
for prev_obs in prev_obstacles:
prev_mask = prev_obs.get('mask')
if prev_mask is None:
continue
# 将前一帧掩码变换到当前帧
warped_prev = warp_mask(prev_mask, M, (W, H))
if warped_prev is None:
continue
# 计算IoU
intersection = np.logical_and(curr_mask > 0, warped_prev > 0).sum()
union = np.logical_or(curr_mask > 0, warped_prev > 0).sum()
iou = intersection / union if union > 0 else 0.0
if iou > best_iou:
best_iou = iou
best_match = prev_obs
if best_match is None:
metrics.append(None)
continue
# 计算度量
curr_area = curr_obs.get('area', 0)
prev_area = best_match.get('area', 0)
area_growth = (curr_area - prev_area) / prev_area if prev_area > 0 else 0.0
curr_bottom_y = curr_obs.get('bottom_y_ratio', 0)
prev_bottom_y = best_match.get('bottom_y_ratio', 0)
v_forward = curr_bottom_y - prev_bottom_y
metrics.append({
'area_growth': area_growth,
'v_forward': v_forward,
'iou': best_iou
})
return metrics
def compute_risk_scores(obstacles, prev_obstacles, M, path_mask, image_shape,
stop_th=0.6, avoid_th=0.56):
"""
计算障碍物的风险评分
:param obstacles: 当前障碍物列表
:param prev_obstacles: 前一帧障碍物列表
:param M: 仿射变换矩阵
:param path_mask: 路径掩码
:param image_shape: 图像形状
:param stop_th: 停止阈值
:param avoid_th: 避让阈值
:return: (评分后的障碍物列表, 是否需要停止, 是否需要避让, 可视化元素)
"""
H, W = image_shape[:2]
has_stop = False
has_avoid = False
risk_vis = []
# 计算接近度量
metrics = compute_approach_metrics(prev_obstacles, obstacles, M, H, W)
for obs, met in zip(obstacles, metrics):
risk_score = 0.0
if met is not None:
# 基于接近速度和面积增长计算风险
if met['v_forward'] > 0.004: # 向下移动
risk_score += 0.3
if met['area_growth'] > 0.01: # 面积增长
risk_score += 0.3
# 基于距离的风险
bottom_y = obs.get('bottom_y_ratio', 0)
area_ratio = obs.get('area_ratio', 0)
if bottom_y > 0.8 or area_ratio > 0.15:
risk_score += 0.3
# 动态物体额外风险
name_lower = str(obs.get('name', '')).lower()
if name_lower in DYNAMIC_CLASS_NAMES:
risk_score *= 1.2
obs['risk_score'] = risk_score
# 更新标志
if risk_score >= stop_th:
has_stop = True
elif risk_score >= avoid_th:
has_avoid = True
# 添加风险可视化
if risk_score > 0.3:
risk_color = "rgba(255, 0, 0, 0.3)" if risk_score >= stop_th else "rgba(255, 165, 0, 0.3)"
risk_vis.append({
"type": "risk_indicator",
"score": risk_score,
"color": risk_color,
"position": [int(obs.get('center_x', W/2)), int(obs.get('center_y', H/2))]
})
return obstacles, has_stop, has_avoid, risk_vis