from supabase import Client from app.core.supabase import get_supabase from app.core.config import settings from loguru import logger from typing import Optional, Union, Dict, List, Any from pathlib import Path import asyncio import functools import os import shutil # Supabase Storage 本地存储根目录(从环境变量读取,支持不同部署环境) # Supabase Storage 本地存储根目录(从环境变量读取,支持不同部署环境) _default_storage_path = "/var/lib/supabase/storage" # 生产环境默认路径 SUPABASE_STORAGE_LOCAL_PATH = Path(os.getenv("SUPABASE_STORAGE_LOCAL_PATH", _default_storage_path)) class StorageService: def __init__(self): self.supabase: Client = get_supabase() self.BUCKET_MATERIALS = "materials" self.BUCKET_OUTPUTS = "outputs" self.BUCKET_REF_AUDIOS = "ref-audios" # 确保所有 bucket 存在 self._ensure_buckets() def _ensure_buckets(self): """确保所有必需的 bucket 存在""" buckets = [self.BUCKET_MATERIALS, self.BUCKET_OUTPUTS, self.BUCKET_REF_AUDIOS] try: existing = self.supabase.storage.list_buckets() existing_names = {b.name for b in existing} if existing else set() for bucket_name in buckets: if bucket_name not in existing_names: try: self.supabase.storage.create_bucket(bucket_name, options={"public": True}) logger.info(f"Created bucket: {bucket_name}") except Exception as e: # 可能已存在,忽略错误 logger.debug(f"Bucket {bucket_name} creation skipped: {e}") except Exception as e: logger.warning(f"Failed to ensure buckets: {e}") def _convert_to_public_url(self, url: str) -> str: """将内部 URL 转换为公网可访问的 URL""" if settings.SUPABASE_PUBLIC_URL and settings.SUPABASE_URL: # 去掉末尾斜杠进行替换 internal_url = settings.SUPABASE_URL.rstrip('/') public_url = settings.SUPABASE_PUBLIC_URL.rstrip('/') return url.replace(internal_url, public_url) return url def get_local_file_path(self, bucket: str, path: str) -> Optional[str]: """ 获取 Storage 文件的本地磁盘路径 Supabase Storage 文件存储结构: {STORAGE_ROOT}/{bucket}/{path}/{internal_uuid} Returns: 本地文件路径,如果不存在返回 None """ try: # 构建目录路径 dir_path = SUPABASE_STORAGE_LOCAL_PATH / bucket / path if not dir_path.exists(): logger.warning(f"Storage 目录不存在: {dir_path}") return None # 目录下只有一个文件(internal_uuid) files = list(dir_path.iterdir()) if not files: logger.warning(f"Storage 目录为空: {dir_path}") return None local_path = str(files[0]) logger.info(f"获取本地文件路径: {local_path}") return local_path except Exception as e: logger.error(f"获取本地文件路径失败: {e}") return None async def upload_file(self, bucket: str, path: str, file_data: bytes, content_type: str) -> str: """ 异步上传文件到 Supabase Storage """ try: # 运行在线程池中,避免阻塞事件循环 loop = asyncio.get_running_loop() await loop.run_in_executor( None, functools.partial( self.supabase.storage.from_(bucket).upload, path=path, file=file_data, file_options={"content-type": content_type, "upsert": "true"} ) ) logger.info(f"Storage upload success: {path}") return path except Exception as e: logger.error(f"Storage upload failed: {e}") raise e async def upload_file_from_path(self, bucket: str, storage_path: str, local_file_path: str, content_type: str) -> str: """ 从本地文件路径上传文件到 Supabase Storage 使用分块读取减少内存峰值,避免大文件整读入内存 Args: bucket: 存储桶名称 storage_path: Storage 中的目标路径 local_file_path: 本地文件的绝对路径 content_type: MIME 类型 """ local_file = Path(local_file_path) if not local_file.exists(): raise FileNotFoundError(f"本地文件不存在: {local_file_path}") loop = asyncio.get_running_loop() file_size = local_file.stat().st_size # 分块读取文件,避免大文件整读入内存 # 虽然最终还是需要拼接成 bytes 传给 SDK,但分块读取可以减少 IO 压力 def read_file_chunked(): chunks = [] chunk_size = 10 * 1024 * 1024 # 10MB per chunk with open(local_file_path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break chunks.append(chunk) return b"".join(chunks) if file_size > 50 * 1024 * 1024: # 大于 50MB 记录日志 logger.info(f"大文件上传: {file_size / 1024 / 1024:.1f}MB") file_data = await loop.run_in_executor(None, read_file_chunked) return await self.upload_file(bucket, storage_path, file_data, content_type) async def get_signed_url(self, bucket: str, path: str, expires_in: int = 3600) -> str: """异步获取签名访问链接""" try: loop = asyncio.get_running_loop() res = await loop.run_in_executor( None, lambda: self.supabase.storage.from_(bucket).create_signed_url(path, expires_in) ) # 兼容处理 url = "" if isinstance(res, dict) and "signedURL" in res: url = res["signedURL"] elif isinstance(res, str): url = res else: logger.warning(f"Unexpected signed_url response: {res}") url = res.get("signedURL", "") if isinstance(res, dict) else str(res) # 转换为公网可访问的 URL return self._convert_to_public_url(url) except Exception as e: logger.error(f"Get signed URL failed: {e}") return "" async def get_public_url(self, bucket: str, path: str) -> str: """获取公开访问链接""" try: loop = asyncio.get_running_loop() res = await loop.run_in_executor( None, lambda: self.supabase.storage.from_(bucket).get_public_url(path) ) # 转换为公网可访问的 URL return self._convert_to_public_url(res) except Exception as e: logger.error(f"Get public URL failed: {e}") return "" async def delete_file(self, bucket: str, path: str): """异步删除文件""" try: loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: self.supabase.storage.from_(bucket).remove([path]) ) logger.info(f"Deleted file: {bucket}/{path}") except Exception as e: logger.error(f"Delete file failed: {e}") pass async def move_file(self, bucket: str, from_path: str, to_path: str): """异步移动/重命名文件""" try: loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: self.supabase.storage.from_(bucket).move(from_path, to_path) ) logger.info(f"Moved file: {bucket}/{from_path} -> {to_path}") except Exception as e: logger.error(f"Move file failed: {e}") raise e async def list_files(self, bucket: str, path: str) -> List[Any]: """异步列出文件""" try: loop = asyncio.get_running_loop() res = await loop.run_in_executor( None, lambda: self.supabase.storage.from_(bucket).list(path) ) return res or [] except Exception as e: logger.error(f"List files failed: {e}") return [] storage_service = StorageService()