From bcddc6f2844a84bf24f028498be403bb68382eb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Sun, 25 Jan 2026 14:36:44 +0100 Subject: [PATCH] wip --- apps/api/src/controllers/track.controller.ts | 4 ++-- packages/redis/redis.ts | 15 +++++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 74a2da76..6ca17487 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -7,7 +7,7 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; import { getEventsGroupQueueShard } from '@openpanel/queue'; -import { getRedisCache } from '@openpanel/redis'; +import { getRedisCache, getRedisQueue } from '@openpanel/redis'; import { type IDecrementPayload, @@ -419,7 +419,7 @@ export async function fetchDeviceId( }); try { - const multi = getRedisCache().multi(); + const multi = getRedisQueue().multi(); multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`); multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`); const res = await multi.exec(); diff --git a/packages/redis/redis.ts b/packages/redis/redis.ts index be23b99e..2dc68802 100644 --- a/packages/redis/redis.ts +++ b/packages/redis/redis.ts @@ -9,6 +9,7 @@ const options: RedisOptions = { export { Redis }; const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; +const REDIS_QUEUE_URL = process.env.REDIS_QUEUE_URL || REDIS_URL; export interface ExtendedRedis extends Redis { getJson: (key: string) => Promise; @@ -74,7 +75,9 @@ export function getRedisCache() { let redisSub: ExtendedRedis; export function getRedisSub() { if (!redisSub) { - redisSub = createRedisClient(REDIS_URL, options); + // In multi-region setup, pub/sub uses central Redis so subscribers + // in any region can receive events published from any region + redisSub = createRedisClient(REDIS_QUEUE_URL, options); } return redisSub; @@ -83,7 +86,9 @@ export function getRedisSub() { let redisPub: ExtendedRedis; export function getRedisPub() { if (!redisPub) { - redisPub = createRedisClient(REDIS_URL, options); + // In multi-region setup, pub/sub uses central Redis so publishers + // in any region can reach subscribers in any region + redisPub = createRedisClient(REDIS_QUEUE_URL, options); } return redisPub; @@ -93,7 +98,8 @@ let redisQueue: ExtendedRedis; export function getRedisQueue() { if (!redisQueue) { // Use different redis for queues (self-hosting will re-use the same redis instance) - redisQueue = createRedisClient(REDIS_URL, { + // In multi-region setup, this points to central EU Redis for queues + redisQueue = createRedisClient(REDIS_QUEUE_URL, { ...options, enableReadyCheck: false, maxRetriesPerRequest: null, @@ -108,7 +114,8 @@ let redisGroupQueue: ExtendedRedis; export function getRedisGroupQueue() { if (!redisGroupQueue) { // Dedicated Redis connection for GroupWorker to avoid blocking BullMQ - redisGroupQueue = createRedisClient(REDIS_URL, { + // In multi-region setup, this points to central EU Redis for queues + redisGroupQueue = createRedisClient(REDIS_QUEUE_URL, { ...options, enableReadyCheck: false, maxRetriesPerRequest: null,