101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
from fastapi import APIRouter, UploadFile, File, HTTPException
|
||
from app.core.config import settings
|
||
import shutil
|
||
import re
|
||
import time
|
||
from pathlib import Path
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
def sanitize_filename(filename: str) -> str:
|
||
"""清理文件名,移除不安全字符"""
|
||
# 移除路径分隔符和特殊字符
|
||
safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename)
|
||
# 限制长度
|
||
if len(safe_name) > 100:
|
||
ext = Path(safe_name).suffix
|
||
safe_name = safe_name[:100 - len(ext)] + ext
|
||
return safe_name
|
||
|
||
|
||
@router.post("")
|
||
async def upload_material(file: UploadFile = File(...)):
|
||
if not file.filename.lower().endswith(('.mp4', '.mov', '.avi')):
|
||
raise HTTPException(400, "Invalid format")
|
||
|
||
# 使用时间戳+原始文件名(保留原始名称,避免冲突)
|
||
timestamp = int(time.time())
|
||
safe_name = sanitize_filename(file.filename)
|
||
save_path = settings.UPLOAD_DIR / "materials" / f"{timestamp}_{safe_name}"
|
||
|
||
# Save file
|
||
with open(save_path, "wb") as buffer:
|
||
shutil.copyfileobj(file.file, buffer)
|
||
|
||
# Calculate size
|
||
size_mb = save_path.stat().st_size / (1024 * 1024)
|
||
|
||
# 提取显示名称(去掉时间戳前缀)
|
||
display_name = safe_name
|
||
|
||
return {
|
||
"id": save_path.stem,
|
||
"name": display_name,
|
||
"path": f"uploads/materials/{save_path.name}",
|
||
"size_mb": size_mb,
|
||
"type": "video"
|
||
}
|
||
|
||
@router.get("")
|
||
async def list_materials():
|
||
materials_dir = settings.UPLOAD_DIR / "materials"
|
||
files = []
|
||
if materials_dir.exists():
|
||
for f in materials_dir.glob("*"):
|
||
try:
|
||
stat = f.stat()
|
||
# 提取显示名称:去掉时间戳前缀 (格式: {timestamp}_{原始文件名})
|
||
display_name = f.name
|
||
if '_' in f.name:
|
||
parts = f.name.split('_', 1)
|
||
if parts[0].isdigit():
|
||
display_name = parts[1] # 原始文件名
|
||
|
||
files.append({
|
||
"id": f.stem,
|
||
"name": display_name,
|
||
"path": f"uploads/materials/{f.name}",
|
||
"size_mb": stat.st_size / (1024 * 1024),
|
||
"type": "video",
|
||
"created_at": stat.st_ctime
|
||
})
|
||
except Exception:
|
||
continue
|
||
# Sort by creation time desc
|
||
files.sort(key=lambda x: x.get("created_at", 0), reverse=True)
|
||
return {"materials": files}
|
||
|
||
|
||
@router.delete("/{material_id}")
|
||
async def delete_material(material_id: str):
|
||
"""删除素材文件"""
|
||
materials_dir = settings.UPLOAD_DIR / "materials"
|
||
|
||
# 查找匹配的文件(ID 是文件名不含扩展名)
|
||
found = None
|
||
for f in materials_dir.glob("*"):
|
||
if f.stem == material_id:
|
||
found = f
|
||
break
|
||
|
||
if not found:
|
||
raise HTTPException(404, "Material not found")
|
||
|
||
try:
|
||
found.unlink()
|
||
return {"success": True, "message": "素材已删除"}
|
||
except Exception as e:
|
||
raise HTTPException(500, f"删除失败: {str(e)}")
|
||
|