Files
ViGent2/backend/app/api/auth.py
2026-02-02 10:51:27 +08:00

339 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证 API注册、登录、登出、修改密码
"""
from fastapi import APIRouter, HTTPException, Response, status, Request
from pydantic import BaseModel, field_validator
from app.core.supabase import get_supabase
from app.core.security import (
get_password_hash,
verify_password,
create_access_token,
generate_session_token,
set_auth_cookie,
clear_auth_cookie,
decode_access_token
)
from loguru import logger
from typing import Optional
import re
router = APIRouter(prefix="/api/auth", tags=["认证"])
class RegisterRequest(BaseModel):
phone: str
password: str
username: Optional[str] = None
@field_validator('phone')
@classmethod
def validate_phone(cls, v):
if not re.match(r'^\d{11}$', v):
raise ValueError('手机号必须是11位数字')
return v
class LoginRequest(BaseModel):
phone: str
password: str
@field_validator('phone')
@classmethod
def validate_phone(cls, v):
if not re.match(r'^\d{11}$', v):
raise ValueError('手机号必须是11位数字')
return v
class ChangePasswordRequest(BaseModel):
old_password: str
new_password: str
@field_validator('new_password')
@classmethod
def validate_new_password(cls, v):
if len(v) < 6:
raise ValueError('新密码长度至少6位')
return v
class UserResponse(BaseModel):
id: str
phone: str
username: Optional[str]
role: str
is_active: bool
expires_at: Optional[str] = None
@router.post("/register")
async def register(request: RegisterRequest):
"""
用户注册
注册后状态为 pending需要管理员激活
"""
try:
supabase = get_supabase()
# 检查手机号是否已存在
existing = supabase.table("users").select("id").eq(
"phone", request.phone
).execute()
if existing.data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="该手机号已注册"
)
# 创建用户
password_hash = get_password_hash(request.password)
result = supabase.table("users").insert({
"phone": request.phone,
"password_hash": password_hash,
"username": request.username or f"用户{request.phone[-4:]}",
"role": "pending",
"is_active": False
}).execute()
logger.info(f"新用户注册: {request.phone}")
return {
"success": True,
"message": "注册成功,请等待管理员审核激活"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"注册失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="注册失败,请稍后重试"
)
@router.post("/login")
async def login(request: LoginRequest, response: Response):
"""
用户登录
- 验证密码
- 检查是否激活
- 实现"后踢前"单设备登录
"""
try:
supabase = get_supabase()
# 查找用户
user_result = supabase.table("users").select("*").eq(
"phone", request.phone
).single().execute()
user = user_result.data
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="手机号或密码错误"
)
# 验证密码
if not verify_password(request.password, user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="手机号或密码错误"
)
# 检查是否激活
if not user["is_active"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="账号未激活,请等待管理员审核"
)
# 检查授权是否过期
if user.get("expires_at"):
from datetime import datetime, timezone
expires_at = datetime.fromisoformat(user["expires_at"].replace("Z", "+00:00"))
if datetime.now(timezone.utc) > expires_at:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="授权已过期,请联系管理员续期"
)
# 生成新的 session_token (后踢前)
session_token = generate_session_token()
# 删除旧 session插入新 session
supabase.table("user_sessions").delete().eq(
"user_id", user["id"]
).execute()
supabase.table("user_sessions").insert({
"user_id": user["id"],
"session_token": session_token,
"device_info": None # 可以从 request headers 获取
}).execute()
# 生成 JWT Token
token = create_access_token(user["id"], session_token)
# 设置 HttpOnly Cookie
set_auth_cookie(response, token)
logger.info(f"用户登录: {request.phone}")
return {
"success": True,
"message": "登录成功",
"user": UserResponse(
id=user["id"],
phone=user["phone"],
username=user.get("username"),
role=user["role"],
is_active=user["is_active"],
expires_at=user.get("expires_at")
)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"登录失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="登录失败,请稍后重试"
)
@router.post("/logout")
async def logout(response: Response):
"""用户登出"""
clear_auth_cookie(response)
return {"success": True, "message": "已登出"}
@router.post("/change-password")
async def change_password(request: ChangePasswordRequest, req: Request, response: Response):
"""
修改密码
- 验证当前密码
- 设置新密码
- 重新生成 session token
"""
# 从 Cookie 获取用户
token = req.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未登录"
)
token_data = decode_access_token(token)
if not token_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效"
)
try:
supabase = get_supabase()
# 获取用户信息
user_result = supabase.table("users").select("*").eq(
"id", token_data.user_id
).single().execute()
user = user_result.data
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
# 验证当前密码
if not verify_password(request.old_password, user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="当前密码错误"
)
# 更新密码
new_password_hash = get_password_hash(request.new_password)
supabase.table("users").update({
"password_hash": new_password_hash
}).eq("id", user["id"]).execute()
# 生成新的 session token使旧 token 失效
new_session_token = generate_session_token()
supabase.table("user_sessions").delete().eq(
"user_id", user["id"]
).execute()
supabase.table("user_sessions").insert({
"user_id": user["id"],
"session_token": new_session_token,
"device_info": None
}).execute()
# 生成新的 JWT Token
new_token = create_access_token(user["id"], new_session_token)
set_auth_cookie(response, new_token)
logger.info(f"用户修改密码: {user['phone']}")
return {
"success": True,
"message": "密码修改成功"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"修改密码失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="修改密码失败,请稍后重试"
)
@router.get("/me")
async def get_me(request: Request):
"""获取当前用户信息"""
# 从 Cookie 获取用户
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未登录"
)
token_data = decode_access_token(token)
if not token_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效"
)
supabase = get_supabase()
user_result = supabase.table("users").select("*").eq(
"id", token_data.user_id
).single().execute()
user = user_result.data
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
return UserResponse(
id=user["id"],
phone=user["phone"],
username=user.get("username"),
role=user["role"],
is_active=user["is_active"],
expires_at=user.get("expires_at")
)