From eea5e3c49065e0735e0bbbeee6202201532fd915 Mon Sep 17 00:00:00 2001 From: shinya Date: Fri, 27 Feb 2026 20:06:26 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=20Redis=20=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E7=BB=93=E6=9E=84=EF=BC=8C=E5=B0=86=E6=89=81=E5=B9=B3?= =?UTF-8?q?=20key-value=20=E6=94=B9=E4=B8=BA=20Hash=20=E5=B9=B6=E6=8F=90?= =?UTF-8?q?=E5=8D=87=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 播放记录、收藏、跳过配置从扁平 key (u:user:pr:source+id) 改为 Redis Hash (u:user:pr), getAllXxx 操作从 KEYS 全库扫描 + mGet/逐条 GET 优化为单次 HGETALL - 用户列表从 KEYS u:*:pwd 全库扫描改为 Redis Set (sys:users),注册时 SADD,删除时 SREM - deleteUser 中所有数据清理均改为直接 DEL Hash key,消除 KEYS 扫描 - cron 定时任务从串行处理改为用户间 3 并发、记录间 5 并发 - 自动数据迁移 v2:启动时检测旧格式数据并迁移到新结构,通过标记位避免重复执行 --- src/app/api/cron/route.ts | 54 ++++++++++++----- src/lib/redis-base.db.ts | 121 +++++++++++++++++++++++--------------- src/lib/upstash.db.ts | 115 +++++++++++++++++++++++------------- 3 files changed, 185 insertions(+), 105 deletions(-) diff --git a/src/app/api/cron/route.ts b/src/app/api/cron/route.ts index 93db66a..f416581 100644 --- a/src/app/api/cron/route.ts +++ b/src/app/api/cron/route.ts @@ -131,7 +131,6 @@ async function refreshRecordAndFavorites() { fallbackTitle: fallbackTitle.trim(), }) .then((detail) => { - // 成功时才缓存结果 const successPromise = Promise.resolve(detail); detailCache.set(key, successPromise); return detail; @@ -140,31 +139,51 @@ async function refreshRecordAndFavorites() { console.error(`获取视频详情失败 (${source}+${id}):`, err); return null; }); + detailCache.set(key, promise); } return promise; }; - for (const user of users) { + // 并发限制工具 + async function runWithConcurrency( + tasks: (() => Promise)[], + concurrency: number + ): Promise { + const results: T[] = []; + let index = 0; + async function worker() { + while (index < tasks.length) { + const i = index++; + results[i] = await tasks[i](); + } + } + await Promise.all(Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker())); + return results; + } + + // 处理单个用户的播放记录和收藏 + const processUser = async (user: string) => { console.log(`开始处理用户: ${user}`); // 播放记录 try { const playRecords = await db.getAllPlayRecords(user); - const totalRecords = Object.keys(playRecords).length; + const entries = Object.entries(playRecords); + const totalRecords = entries.length; let processedRecords = 0; - for (const [key, record] of Object.entries(playRecords)) { + const tasks = entries.map(([key, record]) => async () => { try { const [source, id] = key.split('+'); if (!source || !id) { console.warn(`跳过无效的播放记录键: ${key}`); - continue; + return; } const detail = await getDetail(source, id, record.title); if (!detail) { console.warn(`跳过无法获取详情的播放记录: ${key}`); - continue; + return; } const episodeCount = detail.episodes?.length || 0; @@ -189,10 +208,10 @@ async function refreshRecordAndFavorites() { processedRecords++; } catch (err) { console.error(`处理播放记录失败 (${key}):`, err); - // 继续处理下一个记录 } - } + }); + await runWithConcurrency(tasks, 5); console.log(`播放记录处理完成: ${processedRecords}/${totalRecords}`); } catch (err) { console.error(`获取用户播放记录失败 (${user}):`, err); @@ -204,21 +223,22 @@ async function refreshRecordAndFavorites() { favorites = Object.fromEntries( Object.entries(favorites).filter(([_, fav]) => fav.origin !== 'live') ); - const totalFavorites = Object.keys(favorites).length; + const favEntries = Object.entries(favorites); + const totalFavorites = favEntries.length; let processedFavorites = 0; - for (const [key, fav] of Object.entries(favorites)) { + const tasks = favEntries.map(([key, fav]) => async () => { try { const [source, id] = key.split('+'); if (!source || !id) { console.warn(`跳过无效的收藏键: ${key}`); - continue; + return; } const favDetail = await getDetail(source, id, fav.title); if (!favDetail) { console.warn(`跳过无法获取详情的收藏: ${key}`); - continue; + return; } const favEpisodeCount = favDetail.episodes?.length || 0; @@ -240,15 +260,19 @@ async function refreshRecordAndFavorites() { processedFavorites++; } catch (err) { console.error(`处理收藏失败 (${key}):`, err); - // 继续处理下一个收藏 } - } + }); + await runWithConcurrency(tasks, 5); console.log(`收藏处理完成: ${processedFavorites}/${totalFavorites}`); } catch (err) { console.error(`获取用户收藏失败 (${user}):`, err); } - } + }; + + // 用户间并发处理(限制 3 个用户同时处理) + const userTasks = users.map((user) => () => processUser(user)); + await runWithConcurrency(userTasks, 3); console.log('刷新播放记录/收藏任务完成'); } catch (err) { diff --git a/src/lib/redis-base.db.ts b/src/lib/redis-base.db.ts index c87f9c2..2be13e9 100644 --- a/src/lib/redis-base.db.ts +++ b/src/lib/redis-base.db.ts @@ -253,6 +253,8 @@ export abstract class BaseRedisStorage implements IStorage { async registerUser(userName: string, password: string): Promise { // 简单存储明文密码,生产环境应加密 await this.withRetry(() => this.client.set(this.userPwdKey(userName), password)); + // 维护用户集合 + await this.withRetry(() => this.client.sAdd(this.usersSetKey(), userName)); } async verifyUser(userName: string, password: string): Promise { @@ -286,6 +288,9 @@ export abstract class BaseRedisStorage implements IStorage { // 删除用户密码 await this.withRetry(() => this.client.del(this.userPwdKey(userName))); + // 从用户集合中移除 + await this.withRetry(() => this.client.sRem(this.usersSetKey(), userName)); + // 删除搜索历史 await this.withRetry(() => this.client.del(this.shKey(userName))); @@ -295,14 +300,8 @@ export abstract class BaseRedisStorage implements IStorage { // 删除收藏夹(Hash key 直接删除) await this.withRetry(() => this.client.del(this.favHashKey(userName))); - // 删除跳过片头片尾配置 - const skipConfigPattern = `u:${userName}:skip:*`; - const skipConfigKeys = await this.withRetry(() => - this.client.keys(skipConfigPattern) - ); - if (skipConfigKeys.length > 0) { - await this.withRetry(() => this.client.del(skipConfigKeys)); - } + // 删除跳过片头片尾配置(Hash key 直接删除) + await this.withRetry(() => this.client.del(this.skipHashKey(userName))); } // ---------- 搜索历史 ---------- @@ -338,14 +337,13 @@ export abstract class BaseRedisStorage implements IStorage { } // ---------- 获取全部用户 ---------- + private usersSetKey() { + return 'sys:users'; + } + async getAllUsers(): Promise { - const keys = await this.withRetry(() => this.client.keys('u:*:pwd')); - return keys - .map((k) => { - const match = k.match(/^u:(.+?):pwd$/); - return match ? ensureString(match[1]) : undefined; - }) - .filter((u): u is string => typeof u === 'string'); + const members = await this.withRetry(() => this.client.sMembers(this.usersSetKey())); + return ensureStringArray(members as any[]); } // ---------- 管理员配置 ---------- @@ -365,8 +363,12 @@ export abstract class BaseRedisStorage implements IStorage { } // ---------- 跳过片头片尾配置 ---------- - private skipConfigKey(user: string, source: string, id: string) { - return `u:${user}:skip:${source}+${id}`; + private skipHashKey(user: string) { + return `u:${user}:skip`; // 一个用户的所有跳过配置存在一个 Hash 中 + } + + private skipField(source: string, id: string) { + return `${source}+${id}`; } async getSkipConfig( @@ -375,7 +377,7 @@ export abstract class BaseRedisStorage implements IStorage { id: string ): Promise { const val = await this.withRetry(() => - this.client.get(this.skipConfigKey(userName, source, id)) + this.client.hGet(this.skipHashKey(userName), this.skipField(source, id)) ); return val ? (JSON.parse(val) as SkipConfig) : null; } @@ -387,8 +389,9 @@ export abstract class BaseRedisStorage implements IStorage { config: SkipConfig ): Promise { await this.withRetry(() => - this.client.set( - this.skipConfigKey(userName, source, id), + this.client.hSet( + this.skipHashKey(userName), + this.skipField(source, id), JSON.stringify(config) ) ); @@ -400,43 +403,28 @@ export abstract class BaseRedisStorage implements IStorage { id: string ): Promise { await this.withRetry(() => - this.client.del(this.skipConfigKey(userName, source, id)) + this.client.hDel(this.skipHashKey(userName), this.skipField(source, id)) ); } async getAllSkipConfigs( userName: string ): Promise<{ [key: string]: SkipConfig }> { - const pattern = `u:${userName}:skip:*`; - const keys = await this.withRetry(() => this.client.keys(pattern)); - - if (keys.length === 0) { - return {}; - } - + const all = await this.withRetry(() => + this.client.hGetAll(this.skipHashKey(userName)) + ); const configs: { [key: string]: SkipConfig } = {}; - - // 批量获取所有配置 - const values = await this.withRetry(() => this.client.mGet(keys)); - - keys.forEach((key, index) => { - const value = values[index]; - if (value) { - // 从key中提取source+id - const match = key.match(/^u:.+?:skip:(.+)$/); - if (match) { - const sourceAndId = match[1]; - configs[sourceAndId] = JSON.parse(value as string) as SkipConfig; - } + for (const [field, raw] of Object.entries(all)) { + if (raw) { + configs[field] = JSON.parse(raw) as SkipConfig; } - }); - + } return configs; } // ---------- 数据迁移:旧扁平 key → Hash 结构 ---------- private migrationKey() { - return 'sys:migration:hash_v1'; + return 'sys:migration:hash_v2'; } async migrateData(): Promise { @@ -450,7 +438,6 @@ export abstract class BaseRedisStorage implements IStorage { // 迁移播放记录:u:*:pr:* → u:username:pr (Hash) const prKeys = await this.withRetry(() => this.client.keys('u:*:pr:*')); if (prKeys.length > 0) { - // 过滤掉新 Hash key(没有第四段的就是 Hash key 本身) const oldPrKeys = prKeys.filter((k) => { const parts = k.split(':'); return parts.length >= 4 && parts[2] === 'pr' && parts[3] !== ''; @@ -468,7 +455,6 @@ export abstract class BaseRedisStorage implements IStorage { this.client.hSet(this.prHashKey(userName), field, raw) ); } - // 删除旧 key await this.withRetry(() => this.client.del(oldPrKeys)); console.log(`迁移了 ${oldPrKeys.length} 条播放记录`); } @@ -494,18 +480,57 @@ export abstract class BaseRedisStorage implements IStorage { this.client.hSet(this.favHashKey(userName), field, raw) ); } - // 删除旧 key await this.withRetry(() => this.client.del(oldFavKeys)); console.log(`迁移了 ${oldFavKeys.length} 条收藏`); } } + // 迁移 skipConfig:u:*:skip:* → u:username:skip (Hash) + const skipKeys = await this.withRetry(() => this.client.keys('u:*:skip:*')); + if (skipKeys.length > 0) { + const oldSkipKeys = skipKeys.filter((k) => { + const parts = k.split(':'); + return parts.length >= 4 && parts[2] === 'skip' && parts[3] !== ''; + }); + + if (oldSkipKeys.length > 0) { + const values = await this.withRetry(() => this.client.mGet(oldSkipKeys)); + for (let i = 0; i < oldSkipKeys.length; i++) { + const raw = values[i]; + if (!raw) continue; + const match = oldSkipKeys[i].match(/^u:(.+?):skip:(.+)$/); + if (!match) continue; + const [, userName, field] = match; + await this.withRetry(() => + this.client.hSet(this.skipHashKey(userName), field, raw) + ); + } + await this.withRetry(() => this.client.del(oldSkipKeys)); + console.log(`迁移了 ${oldSkipKeys.length} 条跳过配置`); + } + } + + // 迁移用户列表:从 KEYS u:*:pwd 构建 sys:users Set + const userSetExists = await this.withRetry(() => this.client.exists(this.usersSetKey())); + if (!userSetExists) { + const pwdKeys = await this.withRetry(() => this.client.keys('u:*:pwd')); + const userNames = pwdKeys + .map((k) => { + const match = k.match(/^u:(.+?):pwd$/); + return match ? match[1] : undefined; + }) + .filter((u): u is string => typeof u === 'string'); + if (userNames.length > 0) { + await this.withRetry(() => this.client.sAdd(this.usersSetKey(), userNames)); + console.log(`迁移了 ${userNames.length} 个用户到 Set`); + } + } + // 标记迁移完成 await this.withRetry(() => this.client.set(this.migrationKey(), 'done')); console.log('数据迁移完成'); } catch (error) { console.error('数据迁移失败:', error); - // 不抛出异常,允许服务继续运行 } } diff --git a/src/lib/upstash.db.ts b/src/lib/upstash.db.ts index 8e4e425..13861f8 100644 --- a/src/lib/upstash.db.ts +++ b/src/lib/upstash.db.ts @@ -161,6 +161,8 @@ export class UpstashRedisStorage implements IStorage { async registerUser(userName: string, password: string): Promise { // 简单存储明文密码,生产环境应加密 await withRetry(() => this.client.set(this.userPwdKey(userName), password)); + // 维护用户集合 + await withRetry(() => this.client.sadd(this.usersSetKey(), userName)); } async verifyUser(userName: string, password: string): Promise { @@ -194,6 +196,9 @@ export class UpstashRedisStorage implements IStorage { // 删除用户密码 await withRetry(() => this.client.del(this.userPwdKey(userName))); + // 从用户集合中移除 + await withRetry(() => this.client.srem(this.usersSetKey(), userName)); + // 删除搜索历史 await withRetry(() => this.client.del(this.shKey(userName))); @@ -203,14 +208,8 @@ export class UpstashRedisStorage implements IStorage { // 删除收藏夹(Hash key 直接删除) await withRetry(() => this.client.del(this.favHashKey(userName))); - // 删除跳过片头片尾配置 - const skipConfigPattern = `u:${userName}:skip:*`; - const skipConfigKeys = await withRetry(() => - this.client.keys(skipConfigPattern) - ); - if (skipConfigKeys.length > 0) { - await withRetry(() => this.client.del(...skipConfigKeys)); - } + // 删除跳过片头片尾配置(Hash key 直接删除) + await withRetry(() => this.client.del(this.skipHashKey(userName))); } // ---------- 搜索历史 ---------- @@ -246,14 +245,13 @@ export class UpstashRedisStorage implements IStorage { } // ---------- 获取全部用户 ---------- + private usersSetKey() { + return 'sys:users'; + } + async getAllUsers(): Promise { - const keys = await withRetry(() => this.client.keys('u:*:pwd')); - return keys - .map((k) => { - const match = k.match(/^u:(.+?):pwd$/); - return match ? ensureString(match[1]) : undefined; - }) - .filter((u): u is string => typeof u === 'string'); + const members = await withRetry(() => this.client.smembers(this.usersSetKey())); + return ensureStringArray(members as any[]); } // ---------- 管理员配置 ---------- @@ -271,8 +269,12 @@ export class UpstashRedisStorage implements IStorage { } // ---------- 跳过片头片尾配置 ---------- - private skipConfigKey(user: string, source: string, id: string) { - return `u:${user}:skip:${source}+${id}`; + private skipHashKey(user: string) { + return `u:${user}:skip`; // 一个用户的所有跳过配置存在一个 Hash 中 + } + + private skipField(source: string, id: string) { + return `${source}+${id}`; } async getSkipConfig( @@ -281,7 +283,7 @@ export class UpstashRedisStorage implements IStorage { id: string ): Promise { const val = await withRetry(() => - this.client.get(this.skipConfigKey(userName, source, id)) + this.client.hget(this.skipHashKey(userName), this.skipField(source, id)) ); return val ? (val as SkipConfig) : null; } @@ -293,7 +295,9 @@ export class UpstashRedisStorage implements IStorage { config: SkipConfig ): Promise { await withRetry(() => - this.client.set(this.skipConfigKey(userName, source, id), config) + this.client.hset(this.skipHashKey(userName), { + [this.skipField(source, id)]: config, + }) ); } @@ -303,43 +307,29 @@ export class UpstashRedisStorage implements IStorage { id: string ): Promise { await withRetry(() => - this.client.del(this.skipConfigKey(userName, source, id)) + this.client.hdel(this.skipHashKey(userName), this.skipField(source, id)) ); } async getAllSkipConfigs( userName: string ): Promise<{ [key: string]: SkipConfig }> { - const pattern = `u:${userName}:skip:*`; - const keys = await withRetry(() => this.client.keys(pattern)); - - if (keys.length === 0) { - return {}; - } - + const all = await withRetry(() => + this.client.hgetall(this.skipHashKey(userName)) + ); + if (!all || Object.keys(all).length === 0) return {}; const configs: { [key: string]: SkipConfig } = {}; - - // 批量获取所有配置 - const values = await withRetry(() => this.client.mget(keys)); - - keys.forEach((key, index) => { - const value = values[index]; + for (const [field, value] of Object.entries(all)) { if (value) { - // 从key中提取source+id - const match = key.match(/^u:.+?:skip:(.+)$/); - if (match) { - const sourceAndId = match[1]; - configs[sourceAndId] = value as SkipConfig; - } + configs[field] = value as SkipConfig; } - }); - + } return configs; } // ---------- 数据迁移:旧扁平 key → Hash 结构 ---------- private migrationKey() { - return 'sys:migration:hash_v1'; + return 'sys:migration:hash_v2'; } async migrateData(): Promise { @@ -400,6 +390,47 @@ export class UpstashRedisStorage implements IStorage { } } + // 迁移 skipConfig:u:*:skip:* → u:username:skip (Hash) + const skipKeys: string[] = await withRetry(() => this.client.keys('u:*:skip:*')); + if (skipKeys.length > 0) { + const oldSkipKeys = skipKeys.filter((k) => { + const parts = k.split(':'); + return parts.length >= 4 && parts[2] === 'skip' && parts[3] !== ''; + }); + + for (const oldKey of oldSkipKeys) { + const match = oldKey.match(/^u:(.+?):skip:(.+)$/); + if (!match) continue; + const [, userName, field] = match; + const value = await withRetry(() => this.client.get(oldKey)); + if (value) { + await withRetry(() => + this.client.hset(this.skipHashKey(userName), { [field]: value }) + ); + await withRetry(() => this.client.del(oldKey)); + } + } + if (oldSkipKeys.length > 0) { + console.log(`迁移了 ${oldSkipKeys.length} 条跳过配置`); + } + } + + // 迁移用户列表:从 KEYS u:*:pwd 构建 sys:users Set + const userSetExists = await withRetry(() => this.client.exists(this.usersSetKey())); + if (!userSetExists) { + const pwdKeys: string[] = await withRetry(() => this.client.keys('u:*:pwd')); + const userNames = pwdKeys + .map((k) => { + const match = k.match(/^u:(.+?):pwd$/); + return match ? match[1] : undefined; + }) + .filter((u): u is string => typeof u === 'string'); + if (userNames.length > 0) { + await withRetry(() => this.client.sadd(this.usersSetKey(), ...userNames)); + console.log(`迁移了 ${userNames.length} 个用户到 Set`); + } + } + // 标记迁移完成 await withRetry(() => this.client.set(this.migrationKey(), 'done')); console.log('数据迁移完成');