138 lines
3.6 KiB
Python
138 lines
3.6 KiB
Python
"""
|
||
安全工具模块:JWT Token 和密码处理
|
||
"""
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Optional, Any
|
||
from jose import jwt, JWTError
|
||
from passlib.context import CryptContext
|
||
from pydantic import BaseModel
|
||
from fastapi import Response
|
||
from app.core.config import settings
|
||
import uuid
|
||
|
||
# 密码加密上下文
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
|
||
|
||
class TokenData(BaseModel):
|
||
"""JWT Token 数据结构"""
|
||
user_id: str
|
||
session_token: str
|
||
exp: datetime
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""验证密码"""
|
||
return pwd_context.verify(plain_password, hashed_password)
|
||
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""生成密码哈希"""
|
||
return pwd_context.hash(password)
|
||
|
||
|
||
def create_access_token(user_id: str, session_token: str) -> str:
|
||
"""
|
||
创建 JWT Access Token
|
||
|
||
Args:
|
||
user_id: 用户 ID
|
||
session_token: 会话 Token (用于单设备登录验证)
|
||
"""
|
||
expire = datetime.now(timezone.utc) + timedelta(hours=settings.JWT_EXPIRE_HOURS)
|
||
|
||
to_encode = {
|
||
"sub": user_id,
|
||
"session_token": session_token,
|
||
"exp": expire
|
||
}
|
||
|
||
return jwt.encode(
|
||
to_encode,
|
||
settings.JWT_SECRET_KEY,
|
||
algorithm=settings.JWT_ALGORITHM
|
||
)
|
||
|
||
|
||
def decode_access_token(token: str) -> Optional[TokenData]:
|
||
"""
|
||
解码并验证 JWT Token
|
||
|
||
Returns:
|
||
TokenData 或 None (如果验证失败)
|
||
"""
|
||
try:
|
||
payload = jwt.decode(
|
||
token,
|
||
settings.JWT_SECRET_KEY,
|
||
algorithms=[settings.JWT_ALGORITHM]
|
||
)
|
||
|
||
user_id = payload.get("sub")
|
||
session_token = payload.get("session_token")
|
||
exp = payload.get("exp")
|
||
|
||
if not user_id or not session_token:
|
||
return None
|
||
|
||
return TokenData(
|
||
user_id=user_id,
|
||
session_token=session_token,
|
||
exp=datetime.fromtimestamp(exp, tz=timezone.utc)
|
||
)
|
||
except JWTError:
|
||
return None
|
||
|
||
|
||
def generate_session_token() -> str:
|
||
"""生成新的会话 Token"""
|
||
return str(uuid.uuid4())
|
||
|
||
|
||
def set_auth_cookie(response: Response, token: str) -> None:
|
||
"""
|
||
设置 HttpOnly Cookie
|
||
|
||
Args:
|
||
response: FastAPI Response 对象
|
||
token: JWT Token
|
||
"""
|
||
response.set_cookie(
|
||
key="access_token",
|
||
value=token,
|
||
httponly=True,
|
||
secure=not settings.DEBUG, # 开发/测试环境(DEBUG=True)允许非HTTPS
|
||
samesite="lax",
|
||
max_age=settings.JWT_EXPIRE_HOURS * 3600
|
||
)
|
||
|
||
|
||
def clear_auth_cookie(response: Response) -> None:
|
||
"""清除认证 Cookie"""
|
||
response.delete_cookie(key="access_token")
|
||
|
||
|
||
def create_payment_token(user_id: str) -> str:
|
||
"""生成付费专用短期 JWT token(30 分钟有效)"""
|
||
payload = {
|
||
"sub": user_id,
|
||
"purpose": "payment",
|
||
"exp": datetime.now(timezone.utc) + timedelta(minutes=30),
|
||
}
|
||
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
|
||
|
||
|
||
def decode_payment_token(token: str) -> str | None:
|
||
"""解析 payment_token,返回 user_id(仅 purpose=payment 有效)"""
|
||
try:
|
||
data = jwt.decode(
|
||
token,
|
||
settings.JWT_SECRET_KEY,
|
||
algorithms=[settings.JWT_ALGORITHM],
|
||
)
|
||
if data.get("purpose") != "payment":
|
||
return None
|
||
return data.get("sub")
|
||
except JWTError:
|
||
return None
|