Compare commits
1 Commits
main
...
feature/te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bcddc6f284 |
@@ -7,7 +7,7 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
|
|||||||
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
|
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
|
||||||
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
|
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
|
||||||
import { getEventsGroupQueueShard } from '@openpanel/queue';
|
import { getEventsGroupQueueShard } from '@openpanel/queue';
|
||||||
import { getRedisCache } from '@openpanel/redis';
|
import { getRedisCache, getRedisQueue } from '@openpanel/redis';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
type IDecrementPayload,
|
type IDecrementPayload,
|
||||||
@@ -419,7 +419,7 @@ export async function fetchDeviceId(
|
|||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const multi = getRedisCache().multi();
|
const multi = getRedisQueue().multi();
|
||||||
multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`);
|
multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`);
|
||||||
multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`);
|
multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`);
|
||||||
const res = await multi.exec();
|
const res = await multi.exec();
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ const options: RedisOptions = {
|
|||||||
export { Redis };
|
export { Redis };
|
||||||
|
|
||||||
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
|
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
|
||||||
|
const REDIS_QUEUE_URL = process.env.REDIS_QUEUE_URL || REDIS_URL;
|
||||||
|
|
||||||
export interface ExtendedRedis extends Redis {
|
export interface ExtendedRedis extends Redis {
|
||||||
getJson: <T = any>(key: string) => Promise<T | null>;
|
getJson: <T = any>(key: string) => Promise<T | null>;
|
||||||
@@ -74,7 +75,9 @@ export function getRedisCache() {
|
|||||||
let redisSub: ExtendedRedis;
|
let redisSub: ExtendedRedis;
|
||||||
export function getRedisSub() {
|
export function getRedisSub() {
|
||||||
if (!redisSub) {
|
if (!redisSub) {
|
||||||
redisSub = createRedisClient(REDIS_URL, options);
|
// In multi-region setup, pub/sub uses central Redis so subscribers
|
||||||
|
// in any region can receive events published from any region
|
||||||
|
redisSub = createRedisClient(REDIS_QUEUE_URL, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
return redisSub;
|
return redisSub;
|
||||||
@@ -83,7 +86,9 @@ export function getRedisSub() {
|
|||||||
let redisPub: ExtendedRedis;
|
let redisPub: ExtendedRedis;
|
||||||
export function getRedisPub() {
|
export function getRedisPub() {
|
||||||
if (!redisPub) {
|
if (!redisPub) {
|
||||||
redisPub = createRedisClient(REDIS_URL, options);
|
// In multi-region setup, pub/sub uses central Redis so publishers
|
||||||
|
// in any region can reach subscribers in any region
|
||||||
|
redisPub = createRedisClient(REDIS_QUEUE_URL, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
return redisPub;
|
return redisPub;
|
||||||
@@ -93,7 +98,8 @@ let redisQueue: ExtendedRedis;
|
|||||||
export function getRedisQueue() {
|
export function getRedisQueue() {
|
||||||
if (!redisQueue) {
|
if (!redisQueue) {
|
||||||
// Use different redis for queues (self-hosting will re-use the same redis instance)
|
// Use different redis for queues (self-hosting will re-use the same redis instance)
|
||||||
redisQueue = createRedisClient(REDIS_URL, {
|
// In multi-region setup, this points to central EU Redis for queues
|
||||||
|
redisQueue = createRedisClient(REDIS_QUEUE_URL, {
|
||||||
...options,
|
...options,
|
||||||
enableReadyCheck: false,
|
enableReadyCheck: false,
|
||||||
maxRetriesPerRequest: null,
|
maxRetriesPerRequest: null,
|
||||||
@@ -108,7 +114,8 @@ let redisGroupQueue: ExtendedRedis;
|
|||||||
export function getRedisGroupQueue() {
|
export function getRedisGroupQueue() {
|
||||||
if (!redisGroupQueue) {
|
if (!redisGroupQueue) {
|
||||||
// Dedicated Redis connection for GroupWorker to avoid blocking BullMQ
|
// Dedicated Redis connection for GroupWorker to avoid blocking BullMQ
|
||||||
redisGroupQueue = createRedisClient(REDIS_URL, {
|
// In multi-region setup, this points to central EU Redis for queues
|
||||||
|
redisGroupQueue = createRedisClient(REDIS_QUEUE_URL, {
|
||||||
...options,
|
...options,
|
||||||
enableReadyCheck: false,
|
enableReadyCheck: false,
|
||||||
maxRetriesPerRequest: null,
|
maxRetriesPerRequest: null,
|
||||||
|
|||||||
Reference in New Issue
Block a user