from fastapi import APIRouter, UploadFile, File, HTTPException, Request, BackgroundTasks, Depends from app.core.config import settings from app.core.deps import get_current_user from app.services.storage import storage_service import re import time import traceback import os import aiofiles from pathlib import Path from loguru import logger router = APIRouter() def sanitize_filename(filename: str) -> str: safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename) if len(safe_name) > 100: ext = Path(safe_name).suffix safe_name = safe_name[:100 - len(ext)] + ext return safe_name async def process_and_upload(temp_file_path: str, original_filename: str, content_type: str, user_id: str): """Background task to strip multipart headers and upload to Supabase""" try: logger.info(f"Processing raw upload: {temp_file_path} for user {user_id}") # 1. Analyze file to find actual video content (strip multipart boundaries) # This is a simplified manual parser for a SINGLE file upload. # Structure: # --boundary # Content-Disposition: form-data; name="file"; filename="..." # Content-Type: video/mp4 # \r\n\r\n # [DATA] # \r\n--boundary-- # We need to read the first few KB to find the header end start_offset = 0 end_offset = 0 boundary = b"" file_size = os.path.getsize(temp_file_path) with open(temp_file_path, 'rb') as f: # Read first 4KB to find header head = f.read(4096) # Find boundary first_line_end = head.find(b'\r\n') if first_line_end == -1: raise Exception("Could not find boundary in multipart body") boundary = head[:first_line_end] # e.g. --boundary123 logger.info(f"Detected boundary: {boundary}") # Find end of headers (\r\n\r\n) header_end = head.find(b'\r\n\r\n') if header_end == -1: raise Exception("Could not find end of multipart headers") start_offset = header_end + 4 logger.info(f"Video data starts at offset: {start_offset}") # Find end boundary (read from end of file) # It should be \r\n + boundary + -- + \r\n # We seek to end-200 bytes f.seek(max(0, file_size - 200)) tail = f.read() # The closing boundary is usually --boundary-- # We look for the last occurrence of the boundary last_boundary_pos = tail.rfind(boundary) if last_boundary_pos != -1: # The data ends before \r\n + boundary # The tail buffer relative position needs to be converted to absolute end_pos_in_tail = last_boundary_pos # We also need to check for the preceding \r\n if end_pos_in_tail >= 2 and tail[end_pos_in_tail-2:end_pos_in_tail] == b'\r\n': end_pos_in_tail -= 2 # Absolute end offset end_offset = (file_size - 200) + last_boundary_pos # Correction for CRLF before boundary # Actually, simply: read until (file_size - len(tail) + last_boundary_pos) - 2 end_offset = (max(0, file_size - 200) + last_boundary_pos) - 2 else: logger.warning("Could not find closing boundary, assuming EOF") end_offset = file_size logger.info(f"Video data ends at offset: {end_offset}. Total video size: {end_offset - start_offset}") # 2. Extract and Upload to Supabase # Since we have the file on disk, we can just pass the file object (seeked) to upload_file? # Or if upload_file expects bytes/path, checking storage.py... # It takes `file_data` (bytes) or file-like? # supabase-py's `upload` method handles parsing if we pass a file object. # But we need to pass ONLY the video slice. # So we create a generator or a sliced file object? # Simpler: Read the slice into memory if < 1GB? Or copy to new temp file? # Copying to new temp file is safer for memory. video_path = temp_file_path + "_video.mp4" with open(temp_file_path, 'rb') as src, open(video_path, 'wb') as dst: src.seek(start_offset) # Copy in chunks bytes_to_copy = end_offset - start_offset copied = 0 while copied < bytes_to_copy: chunk_size = min(1024*1024*10, bytes_to_copy - copied) # 10MB chunks chunk = src.read(chunk_size) if not chunk: break dst.write(chunk) copied += len(chunk) logger.info(f"Extracted video content to {video_path}") # 3. Upload to Supabase with user isolation timestamp = int(time.time()) safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', original_filename) # 使用 user_id 作为目录前缀实现隔离 storage_path = f"{user_id}/{timestamp}_{safe_name}" # Use storage service (this calls Supabase which might do its own http request) # We read the cleaned video file with open(video_path, 'rb') as f: file_content = f.read() # Still reading into memory for simple upload call, but server has 32GB RAM so ok for 500MB await storage_service.upload_file( bucket=storage_service.BUCKET_MATERIALS, path=storage_path, file_data=file_content, content_type=content_type ) logger.info(f"Upload to Supabase complete: {storage_path}") # Cleanup os.remove(temp_file_path) os.remove(video_path) return storage_path except Exception as e: logger.error(f"Background upload processing failed: {e}\n{traceback.format_exc()}") raise @router.post("") async def upload_material( request: Request, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user) ): user_id = current_user["id"] logger.info(f"ENTERED upload_material (Streaming Mode) for user {user_id}. Headers: {request.headers}") filename = "unknown_video.mp4" # Fallback content_type = "video/mp4" # Try to parse filename from header if possible (unreliable in raw stream) # We will rely on post-processing or client hint # Frontend sends standard multipart. # Create temp file timestamp = int(time.time()) temp_filename = f"upload_{timestamp}.raw" temp_path = os.path.join("/tmp", temp_filename) # Use /tmp on Linux # Ensure /tmp exists (it does) but verify paths if os.name == 'nt': # Local dev temp_path = f"d:/tmp/{temp_filename}" os.makedirs("d:/tmp", exist_ok=True) try: total_size = 0 last_log = 0 async with aiofiles.open(temp_path, 'wb') as f: async for chunk in request.stream(): await f.write(chunk) total_size += len(chunk) # Log progress every 20MB if total_size - last_log > 20 * 1024 * 1024: logger.info(f"Receiving stream... Processed {total_size / (1024*1024):.2f} MB") last_log = total_size logger.info(f"Stream reception complete. Total size: {total_size} bytes. Saved to {temp_path}") if total_size == 0: raise HTTPException(400, "Received empty body") # Attempt to extract filename from the saved file's first bytes? # Or just accept it as "uploaded_video.mp4" for now to prove it works. # We can try to regex the header in the file content we just wrote. # Implemented in background task to return success immediately. # Wait, if we return immediately, the user's UI might not show the file yet? # The prompt says "Wait for upload". # But to avoid User Waiting Timeout, maybe returning early is better? # NO, user expects the file to be in the list. # So we Must await the processing. # But "Processing" (Strip + Upload to Supabase) takes time. # Receiving took time. # If we await Supabase upload, does it timeout? # Supabase upload is outgoing. Usually faster/stable. # Let's await the processing to ensure "List Materials" shows it. # We need to extract the filename for the list. # Quick extract filename from first 4kb with open(temp_path, 'rb') as f: head = f.read(4096).decode('utf-8', errors='ignore') match = re.search(r'filename="([^"]+)"', head) if match: filename = match.group(1) logger.info(f"Extracted filename from body: {filename}") # Run processing sync (in await) storage_path = await process_and_upload(temp_path, filename, content_type, user_id) # Get signed URL (it exists now) signed_url = await storage_service.get_signed_url( bucket=storage_service.BUCKET_MATERIALS, path=storage_path ) size_mb = total_size / (1024 * 1024) # Approximate (includes headers) # 从 storage_path 提取显示名 display_name = storage_path.split('/')[-1] # 去掉 user_id 前缀 if '_' in display_name: parts = display_name.split('_', 1) if parts[0].isdigit(): display_name = parts[1] return { "id": storage_path, "name": display_name, "path": signed_url, "size_mb": size_mb, "type": "video" } except Exception as e: error_msg = f"Streaming upload failed: {str(e)}" detail_msg = f"Exception: {repr(e)}\nArgs: {e.args}\n{traceback.format_exc()}" logger.error(error_msg + "\n" + detail_msg) # Write to debug file try: with open("debug_upload.log", "a") as logf: logf.write(f"\n--- Error at {time.ctime()} ---\n") logf.write(detail_msg) logf.write("\n-----------------------------\n") except: pass if os.path.exists(temp_path): try: os.remove(temp_path) except: pass raise HTTPException(500, f"Upload failed. Check server logs. Error: {str(e)}") @router.get("") async def list_materials(current_user: dict = Depends(get_current_user)): user_id = current_user["id"] try: # 只列出当前用户目录下的文件 files_obj = await storage_service.list_files( bucket=storage_service.BUCKET_MATERIALS, path=user_id ) materials = [] for f in files_obj: name = f.get('name') if not name or name == '.emptyFolderPlaceholder': continue display_name = name if '_' in name: parts = name.split('_', 1) if parts[0].isdigit(): display_name = parts[1] # 完整路径包含 user_id full_path = f"{user_id}/{name}" signed_url = await storage_service.get_signed_url( bucket=storage_service.BUCKET_MATERIALS, path=full_path ) metadata = f.get('metadata', {}) size = metadata.get('size', 0) # created_at 在顶层,是 ISO 字符串 created_at_str = f.get('created_at', '') created_at = 0 if created_at_str: from datetime import datetime try: dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) created_at = int(dt.timestamp()) except: pass materials.append({ "id": full_path, # ID 使用完整路径 "name": display_name, "path": signed_url, "size_mb": size / (1024 * 1024), "type": "video", "created_at": created_at }) materials.sort(key=lambda x: x['id'], reverse=True) return {"materials": materials} except Exception as e: logger.error(f"List materials failed: {e}") return {"materials": []} @router.delete("/{material_id:path}") async def delete_material(material_id: str, current_user: dict = Depends(get_current_user)): user_id = current_user["id"] # 验证 material_id 属于当前用户 if not material_id.startswith(f"{user_id}/"): raise HTTPException(403, "无权删除此素材") try: await storage_service.delete_file( bucket=storage_service.BUCKET_MATERIALS, path=material_id ) return {"success": True, "message": "素材已删除"} except Exception as e: raise HTTPException(500, f"删除失败: {str(e)}")