1 Commits

Author SHA1 Message Date
Carl-Gerhard Lindesvärd
bcddc6f284 wip 2026-01-25 14:36:44 +01:00
2 changed files with 13 additions and 6 deletions

View File

@@ -7,7 +7,7 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getEventsGroupQueueShard } from '@openpanel/queue'; import { getEventsGroupQueueShard } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis'; import { getRedisCache, getRedisQueue } from '@openpanel/redis';
import { import {
type IDecrementPayload, type IDecrementPayload,
@@ -419,7 +419,7 @@ export async function fetchDeviceId(
}); });
try { try {
const multi = getRedisCache().multi(); const multi = getRedisQueue().multi();
multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`); multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`);
multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`); multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`);
const res = await multi.exec(); const res = await multi.exec();

View File

@@ -9,6 +9,7 @@ const options: RedisOptions = {
export { Redis }; export { Redis };
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
const REDIS_QUEUE_URL = process.env.REDIS_QUEUE_URL || REDIS_URL;
export interface ExtendedRedis extends Redis { export interface ExtendedRedis extends Redis {
getJson: <T = any>(key: string) => Promise<T | null>; getJson: <T = any>(key: string) => Promise<T | null>;
@@ -74,7 +75,9 @@ export function getRedisCache() {
let redisSub: ExtendedRedis; let redisSub: ExtendedRedis;
export function getRedisSub() { export function getRedisSub() {
if (!redisSub) { if (!redisSub) {
redisSub = createRedisClient(REDIS_URL, options); // In multi-region setup, pub/sub uses central Redis so subscribers
// in any region can receive events published from any region
redisSub = createRedisClient(REDIS_QUEUE_URL, options);
} }
return redisSub; return redisSub;
@@ -83,7 +86,9 @@ export function getRedisSub() {
let redisPub: ExtendedRedis; let redisPub: ExtendedRedis;
export function getRedisPub() { export function getRedisPub() {
if (!redisPub) { if (!redisPub) {
redisPub = createRedisClient(REDIS_URL, options); // In multi-region setup, pub/sub uses central Redis so publishers
// in any region can reach subscribers in any region
redisPub = createRedisClient(REDIS_QUEUE_URL, options);
} }
return redisPub; return redisPub;
@@ -93,7 +98,8 @@ let redisQueue: ExtendedRedis;
export function getRedisQueue() { export function getRedisQueue() {
if (!redisQueue) { if (!redisQueue) {
// Use different redis for queues (self-hosting will re-use the same redis instance) // Use different redis for queues (self-hosting will re-use the same redis instance)
redisQueue = createRedisClient(REDIS_URL, { // In multi-region setup, this points to central EU Redis for queues
redisQueue = createRedisClient(REDIS_QUEUE_URL, {
...options, ...options,
enableReadyCheck: false, enableReadyCheck: false,
maxRetriesPerRequest: null, maxRetriesPerRequest: null,
@@ -108,7 +114,8 @@ let redisGroupQueue: ExtendedRedis;
export function getRedisGroupQueue() { export function getRedisGroupQueue() {
if (!redisGroupQueue) { if (!redisGroupQueue) {
// Dedicated Redis connection for GroupWorker to avoid blocking BullMQ // Dedicated Redis connection for GroupWorker to avoid blocking BullMQ
redisGroupQueue = createRedisClient(REDIS_URL, { // In multi-region setup, this points to central EU Redis for queues
redisGroupQueue = createRedisClient(REDIS_QUEUE_URL, {
...options, ...options,
enableReadyCheck: false, enableReadyCheck: false,
maxRetriesPerRequest: null, maxRetriesPerRequest: null,