perf: 优化 Redis 存储结构,将扁平 key-value 改为 Hash 并提升并发处理能力

- 播放记录、收藏、跳过配置从扁平 key (u:user:pr:source+id) 改为 Redis Hash (u:user:pr),
  getAllXxx 操作从 KEYS 全库扫描 + mGet/逐条 GET 优化为单次 HGETALL
- 用户列表从 KEYS u:*:pwd 全库扫描改为 Redis Set (sys:users),注册时 SADD,删除时 SREM
- deleteUser 中所有数据清理均改为直接 DEL Hash key,消除 KEYS 扫描
- cron 定时任务从串行处理改为用户间 3 并发、记录间 5 并发
- 自动数据迁移 v2:启动时检测旧格式数据并迁移到新结构,通过标记位避免重复执行
This commit is contained in:
shinya
2026-02-27 20:06:26 +08:00
parent 13f1fb7166
commit eea5e3c490
3 changed files with 185 additions and 105 deletions

View File

@@ -131,7 +131,6 @@ async function refreshRecordAndFavorites() {
fallbackTitle: fallbackTitle.trim(),
})
.then((detail) => {
// 成功时才缓存结果
const successPromise = Promise.resolve(detail);
detailCache.set(key, successPromise);
return detail;
@@ -140,31 +139,51 @@ async function refreshRecordAndFavorites() {
console.error(`获取视频详情失败 (${source}+${id}):`, err);
return null;
});
detailCache.set(key, promise);
}
return promise;
};
for (const user of users) {
// 并发限制工具
async function runWithConcurrency<T>(
tasks: (() => Promise<T>)[],
concurrency: number
): Promise<T[]> {
const results: T[] = [];
let index = 0;
async function worker() {
while (index < tasks.length) {
const i = index++;
results[i] = await tasks[i]();
}
}
await Promise.all(Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker()));
return results;
}
// 处理单个用户的播放记录和收藏
const processUser = async (user: string) => {
console.log(`开始处理用户: ${user}`);
// 播放记录
try {
const playRecords = await db.getAllPlayRecords(user);
const totalRecords = Object.keys(playRecords).length;
const entries = Object.entries(playRecords);
const totalRecords = entries.length;
let processedRecords = 0;
for (const [key, record] of Object.entries(playRecords)) {
const tasks = entries.map(([key, record]) => async () => {
try {
const [source, id] = key.split('+');
if (!source || !id) {
console.warn(`跳过无效的播放记录键: ${key}`);
continue;
return;
}
const detail = await getDetail(source, id, record.title);
if (!detail) {
console.warn(`跳过无法获取详情的播放记录: ${key}`);
continue;
return;
}
const episodeCount = detail.episodes?.length || 0;
@@ -189,10 +208,10 @@ async function refreshRecordAndFavorites() {
processedRecords++;
} catch (err) {
console.error(`处理播放记录失败 (${key}):`, err);
// 继续处理下一个记录
}
}
});
await runWithConcurrency(tasks, 5);
console.log(`播放记录处理完成: ${processedRecords}/${totalRecords}`);
} catch (err) {
console.error(`获取用户播放记录失败 (${user}):`, err);
@@ -204,21 +223,22 @@ async function refreshRecordAndFavorites() {
favorites = Object.fromEntries(
Object.entries(favorites).filter(([_, fav]) => fav.origin !== 'live')
);
const totalFavorites = Object.keys(favorites).length;
const favEntries = Object.entries(favorites);
const totalFavorites = favEntries.length;
let processedFavorites = 0;
for (const [key, fav] of Object.entries(favorites)) {
const tasks = favEntries.map(([key, fav]) => async () => {
try {
const [source, id] = key.split('+');
if (!source || !id) {
console.warn(`跳过无效的收藏键: ${key}`);
continue;
return;
}
const favDetail = await getDetail(source, id, fav.title);
if (!favDetail) {
console.warn(`跳过无法获取详情的收藏: ${key}`);
continue;
return;
}
const favEpisodeCount = favDetail.episodes?.length || 0;
@@ -240,15 +260,19 @@ async function refreshRecordAndFavorites() {
processedFavorites++;
} catch (err) {
console.error(`处理收藏失败 (${key}):`, err);
// 继续处理下一个收藏
}
}
});
await runWithConcurrency(tasks, 5);
console.log(`收藏处理完成: ${processedFavorites}/${totalFavorites}`);
} catch (err) {
console.error(`获取用户收藏失败 (${user}):`, err);
}
}
};
// 用户间并发处理(限制 3 个用户同时处理)
const userTasks = users.map((user) => () => processUser(user));
await runWithConcurrency(userTasks, 3);
console.log('刷新播放记录/收藏任务完成');
} catch (err) {

View File

@@ -253,6 +253,8 @@ export abstract class BaseRedisStorage implements IStorage {
async registerUser(userName: string, password: string): Promise<void> {
// 简单存储明文密码,生产环境应加密
await this.withRetry(() => this.client.set(this.userPwdKey(userName), password));
// 维护用户集合
await this.withRetry(() => this.client.sAdd(this.usersSetKey(), userName));
}
async verifyUser(userName: string, password: string): Promise<boolean> {
@@ -286,6 +288,9 @@ export abstract class BaseRedisStorage implements IStorage {
// 删除用户密码
await this.withRetry(() => this.client.del(this.userPwdKey(userName)));
// 从用户集合中移除
await this.withRetry(() => this.client.sRem(this.usersSetKey(), userName));
// 删除搜索历史
await this.withRetry(() => this.client.del(this.shKey(userName)));
@@ -295,14 +300,8 @@ export abstract class BaseRedisStorage implements IStorage {
// 删除收藏夹Hash key 直接删除)
await this.withRetry(() => this.client.del(this.favHashKey(userName)));
// 删除跳过片头片尾配置
const skipConfigPattern = `u:${userName}:skip:*`;
const skipConfigKeys = await this.withRetry(() =>
this.client.keys(skipConfigPattern)
);
if (skipConfigKeys.length > 0) {
await this.withRetry(() => this.client.del(skipConfigKeys));
}
// 删除跳过片头片尾配置Hash key 直接删除)
await this.withRetry(() => this.client.del(this.skipHashKey(userName)));
}
// ---------- 搜索历史 ----------
@@ -338,14 +337,13 @@ export abstract class BaseRedisStorage implements IStorage {
}
// ---------- 获取全部用户 ----------
private usersSetKey() {
return 'sys:users';
}
async getAllUsers(): Promise<string[]> {
const keys = await this.withRetry(() => this.client.keys('u:*:pwd'));
return keys
.map((k) => {
const match = k.match(/^u:(.+?):pwd$/);
return match ? ensureString(match[1]) : undefined;
})
.filter((u): u is string => typeof u === 'string');
const members = await this.withRetry(() => this.client.sMembers(this.usersSetKey()));
return ensureStringArray(members as any[]);
}
// ---------- 管理员配置 ----------
@@ -365,8 +363,12 @@ export abstract class BaseRedisStorage implements IStorage {
}
// ---------- 跳过片头片尾配置 ----------
private skipConfigKey(user: string, source: string, id: string) {
return `u:${user}:skip:${source}+${id}`;
private skipHashKey(user: string) {
return `u:${user}:skip`; // 一个用户的所有跳过配置存在一个 Hash 中
}
private skipField(source: string, id: string) {
return `${source}+${id}`;
}
async getSkipConfig(
@@ -375,7 +377,7 @@ export abstract class BaseRedisStorage implements IStorage {
id: string
): Promise<SkipConfig | null> {
const val = await this.withRetry(() =>
this.client.get(this.skipConfigKey(userName, source, id))
this.client.hGet(this.skipHashKey(userName), this.skipField(source, id))
);
return val ? (JSON.parse(val) as SkipConfig) : null;
}
@@ -387,8 +389,9 @@ export abstract class BaseRedisStorage implements IStorage {
config: SkipConfig
): Promise<void> {
await this.withRetry(() =>
this.client.set(
this.skipConfigKey(userName, source, id),
this.client.hSet(
this.skipHashKey(userName),
this.skipField(source, id),
JSON.stringify(config)
)
);
@@ -400,43 +403,28 @@ export abstract class BaseRedisStorage implements IStorage {
id: string
): Promise<void> {
await this.withRetry(() =>
this.client.del(this.skipConfigKey(userName, source, id))
this.client.hDel(this.skipHashKey(userName), this.skipField(source, id))
);
}
async getAllSkipConfigs(
userName: string
): Promise<{ [key: string]: SkipConfig }> {
const pattern = `u:${userName}:skip:*`;
const keys = await this.withRetry(() => this.client.keys(pattern));
if (keys.length === 0) {
return {};
}
const all = await this.withRetry(() =>
this.client.hGetAll(this.skipHashKey(userName))
);
const configs: { [key: string]: SkipConfig } = {};
// 批量获取所有配置
const values = await this.withRetry(() => this.client.mGet(keys));
keys.forEach((key, index) => {
const value = values[index];
if (value) {
// 从key中提取source+id
const match = key.match(/^u:.+?:skip:(.+)$/);
if (match) {
const sourceAndId = match[1];
configs[sourceAndId] = JSON.parse(value as string) as SkipConfig;
}
for (const [field, raw] of Object.entries(all)) {
if (raw) {
configs[field] = JSON.parse(raw) as SkipConfig;
}
});
}
return configs;
}
// ---------- 数据迁移:旧扁平 key → Hash 结构 ----------
private migrationKey() {
return 'sys:migration:hash_v1';
return 'sys:migration:hash_v2';
}
async migrateData(): Promise<void> {
@@ -450,7 +438,6 @@ export abstract class BaseRedisStorage implements IStorage {
// 迁移播放记录u:*:pr:* → u:username:pr (Hash)
const prKeys = await this.withRetry(() => this.client.keys('u:*:pr:*'));
if (prKeys.length > 0) {
// 过滤掉新 Hash key没有第四段的就是 Hash key 本身)
const oldPrKeys = prKeys.filter((k) => {
const parts = k.split(':');
return parts.length >= 4 && parts[2] === 'pr' && parts[3] !== '';
@@ -468,7 +455,6 @@ export abstract class BaseRedisStorage implements IStorage {
this.client.hSet(this.prHashKey(userName), field, raw)
);
}
// 删除旧 key
await this.withRetry(() => this.client.del(oldPrKeys));
console.log(`迁移了 ${oldPrKeys.length} 条播放记录`);
}
@@ -494,18 +480,57 @@ export abstract class BaseRedisStorage implements IStorage {
this.client.hSet(this.favHashKey(userName), field, raw)
);
}
// 删除旧 key
await this.withRetry(() => this.client.del(oldFavKeys));
console.log(`迁移了 ${oldFavKeys.length} 条收藏`);
}
}
// 迁移 skipConfigu:*:skip:* → u:username:skip (Hash)
const skipKeys = await this.withRetry(() => this.client.keys('u:*:skip:*'));
if (skipKeys.length > 0) {
const oldSkipKeys = skipKeys.filter((k) => {
const parts = k.split(':');
return parts.length >= 4 && parts[2] === 'skip' && parts[3] !== '';
});
if (oldSkipKeys.length > 0) {
const values = await this.withRetry(() => this.client.mGet(oldSkipKeys));
for (let i = 0; i < oldSkipKeys.length; i++) {
const raw = values[i];
if (!raw) continue;
const match = oldSkipKeys[i].match(/^u:(.+?):skip:(.+)$/);
if (!match) continue;
const [, userName, field] = match;
await this.withRetry(() =>
this.client.hSet(this.skipHashKey(userName), field, raw)
);
}
await this.withRetry(() => this.client.del(oldSkipKeys));
console.log(`迁移了 ${oldSkipKeys.length} 条跳过配置`);
}
}
// 迁移用户列表:从 KEYS u:*:pwd 构建 sys:users Set
const userSetExists = await this.withRetry(() => this.client.exists(this.usersSetKey()));
if (!userSetExists) {
const pwdKeys = await this.withRetry(() => this.client.keys('u:*:pwd'));
const userNames = pwdKeys
.map((k) => {
const match = k.match(/^u:(.+?):pwd$/);
return match ? match[1] : undefined;
})
.filter((u): u is string => typeof u === 'string');
if (userNames.length > 0) {
await this.withRetry(() => this.client.sAdd(this.usersSetKey(), userNames));
console.log(`迁移了 ${userNames.length} 个用户到 Set`);
}
}
// 标记迁移完成
await this.withRetry(() => this.client.set(this.migrationKey(), 'done'));
console.log('数据迁移完成');
} catch (error) {
console.error('数据迁移失败:', error);
// 不抛出异常,允许服务继续运行
}
}

View File

@@ -161,6 +161,8 @@ export class UpstashRedisStorage implements IStorage {
async registerUser(userName: string, password: string): Promise<void> {
// 简单存储明文密码,生产环境应加密
await withRetry(() => this.client.set(this.userPwdKey(userName), password));
// 维护用户集合
await withRetry(() => this.client.sadd(this.usersSetKey(), userName));
}
async verifyUser(userName: string, password: string): Promise<boolean> {
@@ -194,6 +196,9 @@ export class UpstashRedisStorage implements IStorage {
// 删除用户密码
await withRetry(() => this.client.del(this.userPwdKey(userName)));
// 从用户集合中移除
await withRetry(() => this.client.srem(this.usersSetKey(), userName));
// 删除搜索历史
await withRetry(() => this.client.del(this.shKey(userName)));
@@ -203,14 +208,8 @@ export class UpstashRedisStorage implements IStorage {
// 删除收藏夹Hash key 直接删除)
await withRetry(() => this.client.del(this.favHashKey(userName)));
// 删除跳过片头片尾配置
const skipConfigPattern = `u:${userName}:skip:*`;
const skipConfigKeys = await withRetry(() =>
this.client.keys(skipConfigPattern)
);
if (skipConfigKeys.length > 0) {
await withRetry(() => this.client.del(...skipConfigKeys));
}
// 删除跳过片头片尾配置Hash key 直接删除)
await withRetry(() => this.client.del(this.skipHashKey(userName)));
}
// ---------- 搜索历史 ----------
@@ -246,14 +245,13 @@ export class UpstashRedisStorage implements IStorage {
}
// ---------- 获取全部用户 ----------
private usersSetKey() {
return 'sys:users';
}
async getAllUsers(): Promise<string[]> {
const keys = await withRetry(() => this.client.keys('u:*:pwd'));
return keys
.map((k) => {
const match = k.match(/^u:(.+?):pwd$/);
return match ? ensureString(match[1]) : undefined;
})
.filter((u): u is string => typeof u === 'string');
const members = await withRetry(() => this.client.smembers(this.usersSetKey()));
return ensureStringArray(members as any[]);
}
// ---------- 管理员配置 ----------
@@ -271,8 +269,12 @@ export class UpstashRedisStorage implements IStorage {
}
// ---------- 跳过片头片尾配置 ----------
private skipConfigKey(user: string, source: string, id: string) {
return `u:${user}:skip:${source}+${id}`;
private skipHashKey(user: string) {
return `u:${user}:skip`; // 一个用户的所有跳过配置存在一个 Hash 中
}
private skipField(source: string, id: string) {
return `${source}+${id}`;
}
async getSkipConfig(
@@ -281,7 +283,7 @@ export class UpstashRedisStorage implements IStorage {
id: string
): Promise<SkipConfig | null> {
const val = await withRetry(() =>
this.client.get(this.skipConfigKey(userName, source, id))
this.client.hget(this.skipHashKey(userName), this.skipField(source, id))
);
return val ? (val as SkipConfig) : null;
}
@@ -293,7 +295,9 @@ export class UpstashRedisStorage implements IStorage {
config: SkipConfig
): Promise<void> {
await withRetry(() =>
this.client.set(this.skipConfigKey(userName, source, id), config)
this.client.hset(this.skipHashKey(userName), {
[this.skipField(source, id)]: config,
})
);
}
@@ -303,43 +307,29 @@ export class UpstashRedisStorage implements IStorage {
id: string
): Promise<void> {
await withRetry(() =>
this.client.del(this.skipConfigKey(userName, source, id))
this.client.hdel(this.skipHashKey(userName), this.skipField(source, id))
);
}
async getAllSkipConfigs(
userName: string
): Promise<{ [key: string]: SkipConfig }> {
const pattern = `u:${userName}:skip:*`;
const keys = await withRetry(() => this.client.keys(pattern));
if (keys.length === 0) {
return {};
}
const all = await withRetry(() =>
this.client.hgetall(this.skipHashKey(userName))
);
if (!all || Object.keys(all).length === 0) return {};
const configs: { [key: string]: SkipConfig } = {};
// 批量获取所有配置
const values = await withRetry(() => this.client.mget(keys));
keys.forEach((key, index) => {
const value = values[index];
for (const [field, value] of Object.entries(all)) {
if (value) {
// 从key中提取source+id
const match = key.match(/^u:.+?:skip:(.+)$/);
if (match) {
const sourceAndId = match[1];
configs[sourceAndId] = value as SkipConfig;
}
configs[field] = value as SkipConfig;
}
});
}
return configs;
}
// ---------- 数据迁移:旧扁平 key → Hash 结构 ----------
private migrationKey() {
return 'sys:migration:hash_v1';
return 'sys:migration:hash_v2';
}
async migrateData(): Promise<void> {
@@ -400,6 +390,47 @@ export class UpstashRedisStorage implements IStorage {
}
}
// 迁移 skipConfigu:*:skip:* → u:username:skip (Hash)
const skipKeys: string[] = await withRetry(() => this.client.keys('u:*:skip:*'));
if (skipKeys.length > 0) {
const oldSkipKeys = skipKeys.filter((k) => {
const parts = k.split(':');
return parts.length >= 4 && parts[2] === 'skip' && parts[3] !== '';
});
for (const oldKey of oldSkipKeys) {
const match = oldKey.match(/^u:(.+?):skip:(.+)$/);
if (!match) continue;
const [, userName, field] = match;
const value = await withRetry(() => this.client.get(oldKey));
if (value) {
await withRetry(() =>
this.client.hset(this.skipHashKey(userName), { [field]: value })
);
await withRetry(() => this.client.del(oldKey));
}
}
if (oldSkipKeys.length > 0) {
console.log(`迁移了 ${oldSkipKeys.length} 条跳过配置`);
}
}
// 迁移用户列表:从 KEYS u:*:pwd 构建 sys:users Set
const userSetExists = await withRetry(() => this.client.exists(this.usersSetKey()));
if (!userSetExists) {
const pwdKeys: string[] = await withRetry(() => this.client.keys('u:*:pwd'));
const userNames = pwdKeys
.map((k) => {
const match = k.match(/^u:(.+?):pwd$/);
return match ? match[1] : undefined;
})
.filter((u): u is string => typeof u === 'string');
if (userNames.length > 0) {
await withRetry(() => this.client.sadd(this.usersSetKey(), ...userNames));
console.log(`迁移了 ${userNames.length} 个用户到 Set`);
}
}
// 标记迁移完成
await withRetry(() => this.client.set(this.migrationKey(), 'done'));
console.log('数据迁移完成');