diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 3291aa14..aed924bc 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -1,11 +1,10 @@ -import { logger } from '@/utils/logger'; import { getClientIp, parseIp } from '@/utils/parseIp'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { generateDeviceId } from '@openpanel/common'; import { getSalts } from '@openpanel/db'; import { eventsQueue } from '@openpanel/queue'; -import { redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; export async function postEvent( @@ -38,7 +37,7 @@ export async function postEvent( }); // this will ensure that we don't have multiple events creating sessions - const locked = await redis.set( + const locked = await getRedisCache().set( `request:priority:${currentDeviceId}-${previousDeviceId}`, 'locked', 'EX', diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index ba783fe2..915fe7e9 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,6 +1,5 @@ import { validateClerkJwt } from '@/utils/auth'; import type { FastifyReply, FastifyRequest } from 'fastify'; -import { escape } from 'sqlstring'; import superjson from 'superjson'; import type * as WebSocket from 'ws'; @@ -13,7 +12,7 @@ import { TABLE_NAMES, transformMinimalEvent, } from '@openpanel/db'; -import { redis, redisPub, redisSub } from '@openpanel/redis'; +import { getRedisCache, getRedisPub, getRedisSub } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; export function getLiveEventInfo(key: string) { @@ -36,8 +35,8 @@ export async function testVisitors( return reply.status(404).send('No event found'); } event.projectId = req.params.projectId; - redisPub.publish('event:received', superjson.stringify(event)); - redis.set( + getRedisPub().publish('event:received', superjson.stringify(event)); + getRedisCache().set( `live:event:${event.projectId}:${Math.random() * 1000}`, '', 'EX', @@ -61,7 +60,7 @@ export async function testEvents( if (!event) { return reply.status(404).send('No event found'); } - redisPub.publish('event:saved', superjson.stringify(event)); + getRedisPub().publish('event:saved', superjson.stringify(event)); reply.status(202).send(event); } @@ -77,8 +76,8 @@ export function wsVisitors( ) { const { params } = req; - redisSub.subscribe('event:received'); - redisSub.psubscribe('__key*:expired'); + getRedisSub().subscribe('event:received'); + getRedisSub().psubscribe('__key*:expired'); const message = (channel: string, message: string) => { if (channel === 'event:received') { @@ -99,14 +98,14 @@ export function wsVisitors( } }; - redisSub.on('message', message); - redisSub.on('pmessage', pmessage); + getRedisSub().on('message', message); + getRedisSub().on('pmessage', pmessage); connection.socket.on('close', () => { - redisSub.unsubscribe('event:saved'); - redisSub.punsubscribe('__key*:expired'); - redisSub.off('message', message); - redisSub.off('pmessage', pmessage); + getRedisSub().unsubscribe('event:saved'); + getRedisSub().punsubscribe('__key*:expired'); + getRedisSub().off('message', message); + getRedisSub().off('pmessage', pmessage); }); } @@ -140,7 +139,7 @@ export async function wsProjectEvents( projectId: params.projectId, }); - redisSub.subscribe(subscribeToEvent); + getRedisSub().subscribe(subscribeToEvent); const message = async (channel: string, message: string) => { const event = getSuperJson(message); @@ -159,10 +158,10 @@ export async function wsProjectEvents( } }; - redisSub.on('message', message as any); + getRedisSub().on('message', message as any); connection.socket.on('close', () => { - redisSub.unsubscribe(subscribeToEvent); - redisSub.off('message', message as any); + getRedisSub().unsubscribe(subscribeToEvent); + getRedisSub().off('message', message as any); }); } diff --git a/apps/api/src/controllers/misc.controller.ts b/apps/api/src/controllers/misc.controller.ts index 41b172ba..08861501 100644 --- a/apps/api/src/controllers/misc.controller.ts +++ b/apps/api/src/controllers/misc.controller.ts @@ -5,7 +5,7 @@ import icoToPng from 'ico-to-png'; import sharp from 'sharp'; import { createHash } from '@openpanel/common'; -import { redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; interface GetFaviconParams { url: string; @@ -51,7 +51,7 @@ export async function getFavicon( ) { function sendBuffer(buffer: Buffer, cacheKey?: string) { if (cacheKey) { - redis.set(`favicon:${cacheKey}`, buffer.toString('base64')); + getRedisCache().set(`favicon:${cacheKey}`, buffer.toString('base64')); } reply.type('image/png'); return reply.send(buffer); @@ -65,7 +65,7 @@ export async function getFavicon( if (imageExtensions.find((ext) => url.endsWith(ext))) { const cacheKey = createHash(url, 32); - const cache = await redis.get(`favicon:${cacheKey}`); + const cache = await getRedisCache().get(`favicon:${cacheKey}`); if (cache) { return sendBuffer(Buffer.from(cache, 'base64')); } @@ -76,7 +76,7 @@ export async function getFavicon( } const { hostname } = new URL(url); - const cache = await redis.get(`favicon:${hostname}`); + const cache = await getRedisCache().get(`favicon:${hostname}`); if (cache) { return sendBuffer(Buffer.from(cache, 'base64')); @@ -104,9 +104,9 @@ export async function clearFavicons( request: FastifyRequest, reply: FastifyReply ) { - const keys = await redis.keys('favicon:*'); + const keys = await getRedisCache().keys('favicon:*'); for (const key of keys) { - await redis.del(key); + await getRedisCache().del(key); } return reply.status(404).send('OK'); } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 57e596bc..f7a98426 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -10,7 +10,7 @@ import { round } from '@openpanel/common'; import { chQuery, db, TABLE_NAMES } from '@openpanel/db'; import type { IServiceClient } from '@openpanel/db'; import { eventsQueue } from '@openpanel/queue'; -import { redis, redisPub } from '@openpanel/redis'; +import { getRedisCache, getRedisPub } from '@openpanel/redis'; import type { AppRouter } from '@openpanel/trpc'; import { appRouter, createContext } from '@openpanel/trpc'; @@ -98,7 +98,7 @@ const startServer = async () => { reply.send({ name: 'openpanel sdk api' }); }); fastify.get('/healthcheck', async (request, reply) => { - const redisRes = await withTimings(redis.keys('*')); + const redisRes = await withTimings(getRedisCache().keys('*')); const dbRes = await withTimings(db.project.findFirst()); const queueRes = await withTimings(eventsQueue.getCompleted()); const chRes = await withTimings( @@ -150,7 +150,7 @@ const startServer = async () => { }); // Notify when keys expires - redisPub.config('SET', 'notify-keyspace-events', 'Ex'); + getRedisPub().config('SET', 'notify-keyspace-events', 'Ex'); } catch (e) { logger.error(e, 'Failed to start server'); } diff --git a/apps/worker/scripts/debug.ts b/apps/worker/scripts/debug.ts index 41817eef..604223e0 100644 --- a/apps/worker/scripts/debug.ts +++ b/apps/worker/scripts/debug.ts @@ -3,11 +3,11 @@ import { escape } from 'sqlstring'; import type { IClickhouseEvent } from '@openpanel/db'; import { chQuery, eventBuffer, TABLE_NAMES } from '@openpanel/db'; import { sessionsQueue } from '@openpanel/queue/src/queues'; -import { redis } from '@openpanel/redis'; +import { getRedisCache, getRedisQueue } from '@openpanel/redis'; async function debugStalledEvents() { - const keys = await redis.keys('bull:sessions:sessionEnd*'); - const delayedZRange = await redis.zrange( + const keys = await getRedisQueue().keys('bull:sessions:sessionEnd*'); + const delayedZRange = await getRedisQueue().zrange( 'bull:sessions:delayed', 0, -1, @@ -20,10 +20,14 @@ async function debugStalledEvents() { } return acc; }, - [] as Record + {} as Record + ); + const opKeys = await getRedisCache().keys('op:*'); + const stalledEvents = await getRedisCache().lrange( + 'op:buffer:events:stalled', + 0, + -1 ); - const opKeys = await redis.keys('op:*'); - const stalledEvents = await redis.lrange('op:buffer:events:stalled', 0, -1); // keys.forEach((key) => { // console.log(key); // }); @@ -112,7 +116,14 @@ async function debugStalledEvents() { delayedJobs.sort((a, b) => a.timestamp + a.delay - (b.timestamp + b.delay)); let delayedJobsCount = 0; delayedJobs.forEach((job) => { - const date = new Date(delayedValues[job.id]); + if (!job.id) { + return; + } + const timestamp = delayedValues[job.id]; + if (!timestamp) { + return; + } + const date = new Date(timestamp); // if date is in the past // if (date.getTime() - 1000 * 60 * 5 < Date.now()) { if (date.getTime() < Date.now()) { diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 19a29dea..276b9885 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -5,12 +5,8 @@ import type { WorkerOptions } from 'bullmq'; import { Worker } from 'bullmq'; import express from 'express'; -import { - connection, - cronQueue, - eventsQueue, - sessionsQueue, -} from '@openpanel/queue'; +import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; +import { getRedisQueue } from '@openpanel/redis'; import { cronJob } from './jobs/cron'; import { eventsJob } from './jobs/events'; @@ -23,12 +19,7 @@ serverAdapter.setBasePath(process.env.SELF_HOSTED ? '/worker' : '/'); const app = express(); const workerOptions: WorkerOptions = { - connection: { - ...connection, - enableReadyCheck: false, - maxRetriesPerRequest: null, - enableOfflineQueue: true, - }, + connection: getRedisQueue(), concurrency: parseInt(process.env.CONCURRENCY || '1', 10), }; diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index cb2bfd40..54ee7eed 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -14,7 +14,7 @@ import type { EventsQueuePayloadCreateSessionEnd, EventsQueuePayloadIncomingEvent, } from '@openpanel/queue'; -import { redis } from '@openpanel/redis'; +import { getRedisQueue } from '@openpanel/redis'; function noDateInFuture(eventDate: Date): Date { if (eventDate > new Date()) { @@ -217,7 +217,9 @@ async function getSessionEnd({ currentDeviceId: string; previousDeviceId: string; }) { - const sessionEndKeys = await redis.keys(`*:sessionEnd:${projectId}:*`); + const sessionEndKeys = await getRedisQueue().keys( + `*:sessionEnd:${projectId}:*` + ); const sessionEndJobCurrentDeviceId = await findJobByPrefix( sessionsQueue, diff --git a/apps/worker/src/jobs/events.ts b/apps/worker/src/jobs/events.ts index a9422002..d1892217 100644 --- a/apps/worker/src/jobs/events.ts +++ b/apps/worker/src/jobs/events.ts @@ -7,7 +7,6 @@ import type { EventsQueuePayloadCreateSessionEnd, EventsQueuePayloadIncomingEvent, } from '@openpanel/queue'; -import { redis } from '@openpanel/redis'; import { createSessionEnd } from './events.create-session-end'; import { incomingEvent } from './events.incoming-event'; diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index d75f271a..6d0af53a 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -1,7 +1,7 @@ import client from 'prom-client'; import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; -import { redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; const Registry = client.Registry; @@ -75,7 +75,7 @@ buffers.forEach((buffer) => { name: `buffer_${buffer}_count`, help: 'Number of users in the users array', async collect() { - const metric = await redis.llen(`op:buffer:${buffer}`); + const metric = await getRedisCache().llen(`op:buffer:${buffer}`); this.set(metric); }, }) @@ -86,7 +86,9 @@ buffers.forEach((buffer) => { name: `buffer_${buffer}_stalled_count`, help: 'Number of users in the users array', async collect() { - const metric = await redis.llen(`op:buffer:${buffer}:stalled`); + const metric = await getRedisCache().llen( + `op:buffer:${buffer}:stalled` + ); this.set(metric); }, }) diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 78ff5caa..6fbf0995 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -2,7 +2,7 @@ import { groupBy } from 'ramda'; import SuperJSON from 'superjson'; import { deepMergeObjects } from '@openpanel/common'; -import { redis, redisPub } from '@openpanel/redis'; +import { getRedisCache, getRedisPub } from '@openpanel/redis'; import { ch, TABLE_NAMES } from '../clickhouse-client'; import { transformEvent } from '../services/event.service'; @@ -31,12 +31,12 @@ export class EventBuffer extends RedisBuffer { constructor() { super({ table: TABLE_NAMES.events, - redis, + redis: getRedisCache(), }); } public onInsert?: OnInsert | undefined = (event) => { - redisPub.publish( + getRedisPub().publish( 'event:received', SuperJSON.stringify(transformEvent(event)) ); @@ -54,7 +54,7 @@ export class EventBuffer extends RedisBuffer { savedEvents ) => { for (const event of savedEvents) { - redisPub.publish( + getRedisPub().publish( 'event:saved', SuperJSON.stringify(transformEvent(event)) ); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 616b7923..a699ca1e 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,7 +1,7 @@ import { mergeDeepRight } from 'ramda'; import { toDots } from '@openpanel/common'; -import { redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; import { ch, chQuery } from '../clickhouse-client'; import type { @@ -22,7 +22,7 @@ import { RedisBuffer } from './buffer'; export class ProfileBuffer extends RedisBuffer { constructor() { super({ - redis, + redis: getRedisCache(), table: 'profiles', batchSize: 100, }); diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 3802929b..19972c4c 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -3,7 +3,7 @@ import { escape } from 'sqlstring'; import { v4 as uuid } from 'uuid'; import { toDots } from '@openpanel/common'; -import { redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; import type { IChartEventFilter } from '@openpanel/validation'; import { eventBuffer } from '../buffers'; @@ -184,7 +184,7 @@ export function transformMinimalEvent( } export async function getLiveVisitors(projectId: string) { - const keys = await redis.keys(`live:event:${projectId}:*`); + const keys = await getRedisCache().keys(`live:event:${projectId}:*`); return keys.length; } diff --git a/packages/queue/index.ts b/packages/queue/index.ts index 7dba30a6..647ff89f 100644 --- a/packages/queue/index.ts +++ b/packages/queue/index.ts @@ -1,5 +1,4 @@ export { eventsQueue, cronQueue, sessionsQueue } from './src/queues'; export type * from './src/queues'; -export { connection } from './src/connection'; export { findJobByPrefix } from './src/utils'; export type { JobsOptions } from 'bullmq'; diff --git a/packages/queue/package.json b/packages/queue/package.json index ec7f1907..7579e8f6 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -9,6 +9,7 @@ }, "dependencies": { "@openpanel/db": "workspace:*", + "@openpanel/redis": "workspace:*", "bullmq": "^5.8.7" }, "devDependencies": { diff --git a/packages/queue/src/connection.ts b/packages/queue/src/connection.ts deleted file mode 100644 index 60a50b38..00000000 --- a/packages/queue/src/connection.ts +++ /dev/null @@ -1,10 +0,0 @@ -const parse = (connectionString: string) => { - const url = new URL(connectionString); - return { - host: url.hostname, - port: Number(url.port), - password: url.password, - } as const; -}; - -export const connection = parse(String(process.env.REDIS_URL)); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 1abd55d2..f6978288 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -1,10 +1,9 @@ import { Queue } from 'bullmq'; import type { IServiceCreateEventPayload } from '@openpanel/db'; +import { getRedisQueue } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; -import { connection } from './connection'; - export interface EventsQueuePayloadIncomingEvent { type: 'incomingEvent'; payload: { @@ -63,21 +62,21 @@ export type CronQueuePayload = | CronQueuePayloadFlushProfiles; export const eventsQueue = new Queue('events', { - connection, + connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }); export const sessionsQueue = new Queue('sessions', { - connection, + connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }); export const cronQueue = new Queue('cron', { - connection, + connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, diff --git a/packages/redis/cachable.ts b/packages/redis/cachable.ts index 39fe830d..4aa663dd 100644 --- a/packages/redis/cachable.ts +++ b/packages/redis/cachable.ts @@ -1,4 +1,4 @@ -import { redis } from './redis'; +import { getRedisCache } from './redis'; export function cacheable any>( fn: T, @@ -9,7 +9,7 @@ export function cacheable any>( ): Promise>> { // JSON.stringify here is not bullet proof since ordering of object keys matters etc const key = `cachable:${fn.name}:${JSON.stringify(args)}`; - const cached = await redis.get(key); + const cached = await getRedisCache().get(key); if (cached) { try { return JSON.parse(cached); @@ -20,7 +20,7 @@ export function cacheable any>( const result = await fn(...(args as any)); if (result !== undefined || result !== null) { - redis.setex(key, expire, JSON.stringify(result)); + getRedisCache().setex(key, expire, JSON.stringify(result)); } return result; diff --git a/packages/redis/redis.ts b/packages/redis/redis.ts index cf3e59ec..7abc06da 100644 --- a/packages/redis/redis.ts +++ b/packages/redis/redis.ts @@ -2,12 +2,65 @@ import type { RedisOptions } from 'ioredis'; import Redis from 'ioredis'; const options: RedisOptions = { - connectTimeout: 30000, - maxRetriesPerRequest: null, + connectTimeout: 10000, }; export { Redis }; -export const redis = new Redis(process.env.REDIS_URL!, options); -export const redisSub = new Redis(process.env.REDIS_URL!, options); -export const redisPub = new Redis(process.env.REDIS_URL!, options); +const createRedisClient = ( + url: string, + overrides: RedisOptions = {} +): Redis => { + const client = new Redis(url, { ...options, ...overrides }); + + client.on('error', (error) => { + console.error('Redis Client Error:', error); + }); + + return client; +}; + +let redisCache: Redis; +export function getRedisCache() { + if (!redisCache) { + redisCache = createRedisClient(process.env.REDIS_URL!, options); + } + + return redisCache; +} + +let redisSub: Redis; +export function getRedisSub() { + if (!redisSub) { + redisSub = createRedisClient(process.env.REDIS_URL!, options); + } + + return redisSub; +} + +let redisPub: Redis; +export function getRedisPub() { + if (!redisPub) { + redisPub = createRedisClient(process.env.REDIS_URL!, options); + } + + return redisPub; +} + +let redisQueue: Redis; +export function getRedisQueue() { + if (!redisQueue) { + // Use different redis for queues (self-hosting will re-use the same redis instance) + redisQueue = createRedisClient( + (process.env.QUEUE_REDIS_URL || process.env.REDIS_URL)!, + { + ...options, + enableReadyCheck: false, + maxRetriesPerRequest: null, + enableOfflineQueue: true, + } + ); + } + + return redisQueue; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b1c451f1..bab16c73 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1008,6 +1008,9 @@ importers: '@openpanel/db': specifier: workspace:* version: link:../db + '@openpanel/redis': + specifier: workspace:* + version: link:../redis bullmq: specifier: ^5.8.7 version: 5.8.7