import { getSuperJson, setSuperJson } from '@openpanel/json'; import type { RedisOptions } from 'ioredis'; import { Redis } from 'ioredis'; const options: RedisOptions = { connectTimeout: 10000, }; export { Redis }; const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; const REDIS_QUEUE_URL = process.env.REDIS_QUEUE_URL || REDIS_URL; export interface ExtendedRedis extends Redis { getJson: (key: string) => Promise; setJson: ( key: string, expireInSec: number, value: T, ) => Promise; } const createRedisClient = ( url: string, overrides: RedisOptions = {}, ): ExtendedRedis => { const client = new Redis(url, { ...options, ...overrides, }) as ExtendedRedis; client.on('error', (error) => { console.error('Redis Client Error:', error); }); client.getJson = async (key: string): Promise => { const value = await client.get(key); if (value) { const res = getSuperJson(value) as T; if (res && Array.isArray(res) && res.length === 0) { return null; } if (res && typeof res === 'object' && Object.keys(res).length === 0) { return null; } if (res) { return res; } } return null; }; client.setJson = async ( key: string, expireInSec: number, value: T, ): Promise => { await client.setex(key, expireInSec, setSuperJson(value)); }; return client; }; let redisCache: ExtendedRedis; export function getRedisCache() { if (!redisCache) { redisCache = createRedisClient(REDIS_URL, options); } return redisCache; } let redisSub: ExtendedRedis; export function getRedisSub() { if (!redisSub) { // In multi-region setup, pub/sub uses central Redis so subscribers // in any region can receive events published from any region redisSub = createRedisClient(REDIS_QUEUE_URL, options); } return redisSub; } let redisPub: ExtendedRedis; export function getRedisPub() { if (!redisPub) { // In multi-region setup, pub/sub uses central Redis so publishers // in any region can reach subscribers in any region redisPub = createRedisClient(REDIS_QUEUE_URL, options); } return redisPub; } let redisQueue: ExtendedRedis; export function getRedisQueue() { if (!redisQueue) { // Use different redis for queues (self-hosting will re-use the same redis instance) // In multi-region setup, this points to central EU Redis for queues redisQueue = createRedisClient(REDIS_QUEUE_URL, { ...options, enableReadyCheck: false, maxRetriesPerRequest: null, enableOfflineQueue: true, }); } return redisQueue; } let redisGroupQueue: ExtendedRedis; export function getRedisGroupQueue() { if (!redisGroupQueue) { // Dedicated Redis connection for GroupWorker to avoid blocking BullMQ // In multi-region setup, this points to central EU Redis for queues redisGroupQueue = createRedisClient(REDIS_QUEUE_URL, { ...options, enableReadyCheck: false, maxRetriesPerRequest: null, enableOfflineQueue: true, }); } return redisGroupQueue; } export async function getLock(key: string, value: string, timeout: number) { const lock = await getRedisCache().set(key, value, 'PX', timeout, 'NX'); return lock === 'OK'; }