import re import os import time import asyncio import traceback import aiofiles from pathlib import Path from loguru import logger from app.services.storage import storage_service def sanitize_filename(filename: str) -> str: safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename) if len(safe_name) > 100: ext = Path(safe_name).suffix safe_name = safe_name[:100 - len(ext)] + ext return safe_name def _extract_display_name(storage_name: str) -> str: """从存储文件名中提取显示名(去掉时间戳前缀)""" if '_' in storage_name: parts = storage_name.split('_', 1) if parts[0].isdigit(): return parts[1] return storage_name async def _process_and_upload(temp_file_path: str, original_filename: str, content_type: str, user_id: str) -> str: """Strip multipart headers and upload to Supabase, return storage_path""" try: logger.info(f"Processing raw upload: {temp_file_path} for user {user_id}") file_size = os.path.getsize(temp_file_path) with open(temp_file_path, 'rb') as f: head = f.read(4096) first_line_end = head.find(b'\r\n') if first_line_end == -1: raise Exception("Could not find boundary in multipart body") boundary = head[:first_line_end] logger.info(f"Detected boundary: {boundary}") header_end = head.find(b'\r\n\r\n') if header_end == -1: raise Exception("Could not find end of multipart headers") start_offset = header_end + 4 logger.info(f"Video data starts at offset: {start_offset}") f.seek(max(0, file_size - 200)) tail = f.read() last_boundary_pos = tail.rfind(boundary) if last_boundary_pos != -1: end_offset = (max(0, file_size - 200) + last_boundary_pos) - 2 else: logger.warning("Could not find closing boundary, assuming EOF") end_offset = file_size logger.info(f"Video data ends at offset: {end_offset}. Total video size: {end_offset - start_offset}") video_path = temp_file_path + "_video.mp4" with open(temp_file_path, 'rb') as src, open(video_path, 'wb') as dst: src.seek(start_offset) bytes_to_copy = end_offset - start_offset copied = 0 while copied < bytes_to_copy: chunk_size = min(1024 * 1024 * 10, bytes_to_copy - copied) chunk = src.read(chunk_size) if not chunk: break dst.write(chunk) copied += len(chunk) logger.info(f"Extracted video content to {video_path}") timestamp = int(time.time()) safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', original_filename) storage_path = f"{user_id}/{timestamp}_{safe_name}" with open(video_path, 'rb') as f: file_content = f.read() await storage_service.upload_file( bucket=storage_service.BUCKET_MATERIALS, path=storage_path, file_data=file_content, content_type=content_type ) logger.info(f"Upload to Supabase complete: {storage_path}") os.remove(temp_file_path) os.remove(video_path) return storage_path except Exception as e: logger.error(f"Background upload processing failed: {e}\n{traceback.format_exc()}") raise async def upload_material(request, user_id: str) -> dict: """接收流式上传并存储到 Supabase,返回素材信息""" filename = "unknown_video.mp4" content_type = "video/mp4" timestamp = int(time.time()) temp_filename = f"upload_{timestamp}.raw" temp_path = os.path.join("/tmp", temp_filename) if os.name == 'nt': temp_path = f"d:/tmp/{temp_filename}" os.makedirs("d:/tmp", exist_ok=True) try: total_size = 0 last_log = 0 async with aiofiles.open(temp_path, 'wb') as f: async for chunk in request.stream(): await f.write(chunk) total_size += len(chunk) if total_size - last_log > 20 * 1024 * 1024: logger.info(f"Receiving stream... Processed {total_size / (1024*1024):.2f} MB") last_log = total_size logger.info(f"Stream reception complete. Total size: {total_size} bytes. Saved to {temp_path}") if total_size == 0: raise ValueError("Received empty body") with open(temp_path, 'rb') as f: head = f.read(4096).decode('utf-8', errors='ignore') match = re.search(r'filename="([^"]+)"', head) if match: filename = match.group(1) logger.info(f"Extracted filename from body: {filename}") storage_path = await _process_and_upload(temp_path, filename, content_type, user_id) signed_url = await storage_service.get_signed_url( bucket=storage_service.BUCKET_MATERIALS, path=storage_path ) size_mb = total_size / (1024 * 1024) display_name = _extract_display_name(storage_path.split('/')[-1]) return { "id": storage_path, "name": display_name, "path": signed_url, "size_mb": size_mb, "type": "video" } except Exception as e: error_msg = f"Streaming upload failed: {str(e)}" detail_msg = f"Exception: {repr(e)}\nArgs: {e.args}\n{traceback.format_exc()}" logger.error(error_msg + "\n" + detail_msg) try: with open("debug_upload.log", "a") as logf: logf.write(f"\n--- Error at {time.ctime()} ---\n") logf.write(detail_msg) logf.write("\n-----------------------------\n") except: pass if os.path.exists(temp_path): try: os.remove(temp_path) except: pass raise async def list_materials(user_id: str) -> list[dict]: """列出用户的所有素材""" try: files_obj = await storage_service.list_files( bucket=storage_service.BUCKET_MATERIALS, path=user_id ) semaphore = asyncio.Semaphore(8) async def build_item(f): name = f.get('name') if not name or name == '.emptyFolderPlaceholder': return None display_name = _extract_display_name(name) full_path = f"{user_id}/{name}" async with semaphore: signed_url = await storage_service.get_signed_url( bucket=storage_service.BUCKET_MATERIALS, path=full_path ) metadata = f.get('metadata', {}) size = metadata.get('size', 0) created_at_str = f.get('created_at', '') created_at = 0 if created_at_str: from datetime import datetime try: dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) created_at = int(dt.timestamp()) except Exception: pass return { "id": full_path, "name": display_name, "path": signed_url, "size_mb": size / (1024 * 1024), "type": "video", "created_at": created_at } tasks = [build_item(f) for f in files_obj] results = await asyncio.gather(*tasks, return_exceptions=True) materials = [] for item in results: if not item: continue if isinstance(item, Exception): logger.warning(f"Material signed url build failed: {item}") continue materials.append(item) materials.sort(key=lambda x: x['id'], reverse=True) return materials except Exception as e: logger.error(f"List materials failed: {e}") return [] async def delete_material(material_id: str, user_id: str) -> None: """删除素材""" if not material_id.startswith(f"{user_id}/"): raise PermissionError("无权删除此素材") await storage_service.delete_file( bucket=storage_service.BUCKET_MATERIALS, path=material_id ) async def rename_material(material_id: str, new_name_raw: str, user_id: str) -> dict: """重命名素材,返回更新后的素材信息""" if not material_id.startswith(f"{user_id}/"): raise PermissionError("无权重命名此素材") new_name_raw = new_name_raw.strip() if new_name_raw else "" if not new_name_raw: raise ValueError("新名称不能为空") old_name = material_id.split("/", 1)[1] old_ext = Path(old_name).suffix base_name = Path(new_name_raw).stem if Path(new_name_raw).suffix else new_name_raw safe_base = sanitize_filename(base_name).strip() if not safe_base: raise ValueError("新名称无效") new_filename = f"{safe_base}{old_ext}" prefix = None if "_" in old_name: maybe_prefix, _ = old_name.split("_", 1) if maybe_prefix.isdigit(): prefix = maybe_prefix if prefix: new_filename = f"{prefix}_{new_filename}" new_path = f"{user_id}/{new_filename}" if new_path != material_id: await storage_service.move_file( bucket=storage_service.BUCKET_MATERIALS, from_path=material_id, to_path=new_path ) signed_url = await storage_service.get_signed_url( bucket=storage_service.BUCKET_MATERIALS, path=new_path ) display_name = _extract_display_name(new_filename) return { "id": new_path, "name": display_name, "path": signed_url, }