""" 安全工具模块:JWT Token 和密码处理 """ from datetime import datetime, timedelta, timezone from typing import Optional, Any from jose import jwt, JWTError from passlib.context import CryptContext from pydantic import BaseModel from fastapi import Response from app.core.config import settings import uuid # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class TokenData(BaseModel): """JWT Token 数据结构""" user_id: str session_token: str exp: datetime def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """生成密码哈希""" return pwd_context.hash(password) def create_access_token(user_id: str, session_token: str) -> str: """ 创建 JWT Access Token Args: user_id: 用户 ID session_token: 会话 Token (用于单设备登录验证) """ expire = datetime.now(timezone.utc) + timedelta(hours=settings.JWT_EXPIRE_HOURS) to_encode = { "sub": user_id, "session_token": session_token, "exp": expire } return jwt.encode( to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) def decode_access_token(token: str) -> Optional[TokenData]: """ 解码并验证 JWT Token Returns: TokenData 或 None (如果验证失败) """ try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) user_id = payload.get("sub") session_token = payload.get("session_token") exp = payload.get("exp") if not user_id or not session_token: return None return TokenData( user_id=user_id, session_token=session_token, exp=datetime.fromtimestamp(exp, tz=timezone.utc) ) except JWTError: return None def generate_session_token() -> str: """生成新的会话 Token""" return str(uuid.uuid4()) def set_auth_cookie(response: Response, token: str) -> None: """ 设置 HttpOnly Cookie Args: response: FastAPI Response 对象 token: JWT Token """ response.set_cookie( key="access_token", value=token, httponly=True, secure=not settings.DEBUG, # 开发/测试环境(DEBUG=True)允许非HTTPS samesite="lax", max_age=settings.JWT_EXPIRE_HOURS * 3600 ) def clear_auth_cookie(response: Response) -> None: """清除认证 Cookie""" response.delete_cookie(key="access_token")