From 89c5732efe192011e2b76ace8fbebfa776a8a9c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 22 Jul 2024 21:49:46 +0200 Subject: [PATCH] fix redis timeout connection on serverless (avoid init redis directly) --- packages/db/src/buffers/buffer.ts | 16 +++++++--------- packages/db/src/buffers/event-buffer.ts | 5 ++--- packages/db/src/buffers/profile-buffer.ts | 1 - 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 0b02e923..1ec48a10 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -1,4 +1,4 @@ -import type { Redis } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis'; export const DELETE = '__DELETE__'; @@ -40,7 +40,6 @@ export abstract class RedisBuffer { public prefix = 'op:buffer'; public table: string; public batchSize?: number; - public redis: Redis; // abstract methods public abstract onInsert?: OnInsert; @@ -49,9 +48,8 @@ export abstract class RedisBuffer { public abstract find: Find; public abstract findMany: FindMany; - constructor(options: { table: string; redis: Redis; batchSize?: number }) { + constructor(options: { table: string; batchSize?: number }) { this.table = options.table; - this.redis = options.redis; this.batchSize = options.batchSize; } @@ -65,9 +63,9 @@ export abstract class RedisBuffer { public async insert(value: T) { this.onInsert?.(value); - await this.redis.rpush(this.getKey(), JSON.stringify(value)); + await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); - const length = await this.redis.llen(this.getKey()); + const length = await getRedisCache().llen(this.getKey()); if (this.batchSize && length >= this.batchSize) { this.flush(); } @@ -109,7 +107,7 @@ export abstract class RedisBuffer { e ); const timestamp = new Date().getTime(); - await this.redis.hset(this.getKey(`failed:${timestamp}`), { + await getRedisCache().hset(this.getKey(`failed:${timestamp}`), { error: getError(e), data: JSON.stringify(queue.map((item) => item.event)), retries: 0, @@ -121,7 +119,7 @@ export abstract class RedisBuffer { } public async deleteIndexes(indexes: number[]) { - const multi = this.redis.multi(); + const multi = getRedisCache().multi(); indexes.forEach((index) => { multi.lset(this.getKey(), index, DELETE); }); @@ -130,7 +128,7 @@ export abstract class RedisBuffer { } public async getQueue(limit: number): Promise[]> { - const queue = await this.redis.lrange(this.getKey(), 0, limit); + const queue = await getRedisCache().lrange(this.getKey(), 0, limit); return queue .map((item, index) => ({ event: this.transformQueueItem(item), diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 9241f3d5..4e2a5a1b 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -31,7 +31,6 @@ export class EventBuffer extends RedisBuffer { constructor() { super({ table: TABLE_NAMES.events, - redis: getRedisCache(), }); } @@ -41,7 +40,7 @@ export class EventBuffer extends RedisBuffer { SuperJSON.stringify(transformEvent(event)) ); if (event.profile_id) { - this.redis.set( + getRedisCache().set( `live:event:${event.project_id}:${event.profile_id}`, '', 'EX', @@ -168,7 +167,7 @@ export class EventBuffer extends RedisBuffer { }); if (itemsToStalled.size > 0) { - const multi = this.redis.multi(); + const multi = getRedisCache().multi(); for (const item of itemsToStalled) { multi.rpush(this.getKey('stalled'), JSON.stringify(item.event)); } diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index a699ca1e..75439133 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -22,7 +22,6 @@ import { RedisBuffer } from './buffer'; export class ProfileBuffer extends RedisBuffer { constructor() { super({ - redis: getRedisCache(), table: 'profiles', batchSize: 100, });