1 Commits

Author SHA1 Message Date
Carl-Gerhard Lindesvärd
bcddc6f284 wip 2026-01-25 14:36:44 +01:00
2 changed files with 13 additions and 6 deletions

View File

@@ -7,7 +7,7 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import { getRedisCache, getRedisQueue } from '@openpanel/redis';
import {
type IDecrementPayload,
@@ -419,7 +419,7 @@ export async function fetchDeviceId(
});
try {
const multi = getRedisCache().multi();
const multi = getRedisQueue().multi();
multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`);
multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`);
const res = await multi.exec();

View File

@@ -9,6 +9,7 @@ const options: RedisOptions = {
export { Redis };
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
const REDIS_QUEUE_URL = process.env.REDIS_QUEUE_URL || REDIS_URL;
export interface ExtendedRedis extends Redis {
getJson: <T = any>(key: string) => Promise<T | null>;
@@ -74,7 +75,9 @@ export function getRedisCache() {
let redisSub: ExtendedRedis;
export function getRedisSub() {
if (!redisSub) {
redisSub = createRedisClient(REDIS_URL, options);
// In multi-region setup, pub/sub uses central Redis so subscribers
// in any region can receive events published from any region
redisSub = createRedisClient(REDIS_QUEUE_URL, options);
}
return redisSub;
@@ -83,7 +86,9 @@ export function getRedisSub() {
let redisPub: ExtendedRedis;
export function getRedisPub() {
if (!redisPub) {
redisPub = createRedisClient(REDIS_URL, options);
// In multi-region setup, pub/sub uses central Redis so publishers
// in any region can reach subscribers in any region
redisPub = createRedisClient(REDIS_QUEUE_URL, options);
}
return redisPub;
@@ -93,7 +98,8 @@ let redisQueue: ExtendedRedis;
export function getRedisQueue() {
if (!redisQueue) {
// Use different redis for queues (self-hosting will re-use the same redis instance)
redisQueue = createRedisClient(REDIS_URL, {
// In multi-region setup, this points to central EU Redis for queues
redisQueue = createRedisClient(REDIS_QUEUE_URL, {
...options,
enableReadyCheck: false,
maxRetriesPerRequest: null,
@@ -108,7 +114,8 @@ let redisGroupQueue: ExtendedRedis;
export function getRedisGroupQueue() {
if (!redisGroupQueue) {
// Dedicated Redis connection for GroupWorker to avoid blocking BullMQ
redisGroupQueue = createRedisClient(REDIS_URL, {
// In multi-region setup, this points to central EU Redis for queues
redisGroupQueue = createRedisClient(REDIS_QUEUE_URL, {
...options,
enableReadyCheck: false,
maxRetriesPerRequest: null,