diff --git a/packages/db/src/buffers/bot-buffer.ts b/packages/db/src/buffers/bot-buffer.ts index ec0a675d..11c17b28 100644 --- a/packages/db/src/buffers/bot-buffer.ts +++ b/packages/db/src/buffers/bot-buffer.ts @@ -5,7 +5,7 @@ import { RedisBuffer } from './buffer'; type BufferType = IClickhouseBotEvent; export class BotBuffer extends RedisBuffer { constructor() { - super(TABLE_NAMES.events_bots, 500); + super('events_bots', 500); } protected async insertIntoDB(items: BufferType[]): Promise { diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 6ed1c6bc..781e6210 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -20,6 +20,7 @@ export class RedisBuffer { private lockKey: string; protected maxBufferSize: number | null; protected logger: ILogger; + private isCurrentlyFlushing = false; constructor(bufferName: string, maxBufferSize: number | null) { this.bufferKey = bufferName; @@ -57,6 +58,11 @@ export class RedisBuffer { } public async tryFlush(): Promise { + if (this.isCurrentlyFlushing) { + this.logger.debug('Already flushing. Skipping additional flush attempt.'); + return; + } + const lockId = uuidv4(); const acquired = await getRedisCache().set( this.lockKey, @@ -68,15 +74,17 @@ export class RedisBuffer { if (acquired === 'OK') { this.logger.debug('Lock acquired. Attempting to flush.'); + this.isCurrentlyFlushing = true; try { await this.flush(); } catch (error) { this.logger.error('Failed to flush buffer', { error }); } finally { + this.isCurrentlyFlushing = false; await this.releaseLock(lockId); } } else { - this.logger.debug('Failed to acquire lock for. Skipping flush.'); + this.logger.debug('Failed to acquire lock. Skipping flush.'); } } @@ -129,7 +137,7 @@ export class RedisBuffer { }); throw new Error('Redis transaction failed'); } - + const lrange = result[0]; const lrangePrevious = result[1]; @@ -141,7 +149,11 @@ export class RedisBuffer { } const items = lrange[1] as string[]; - if (lrangePrevious && lrangePrevious[0] === null && Array.isArray(lrangePrevious[1])) { + if ( + lrangePrevious && + lrangePrevious[0] === null && + Array.isArray(lrangePrevious[1]) + ) { items.push(...(lrangePrevious[1] as string[])); } @@ -149,21 +161,23 @@ export class RedisBuffer { .map((item) => getSafeJson(item) as T | null) .filter((item): item is T => item !== null); - if (parsedItems.length > 0) { - await getRedisCache().lpush( - this.getKey('backup'), - ...parsedItems.map((item) => JSON.stringify(item)), - ); - } - if (parsedItems.length === 0) { this.logger.debug('No items to flush'); + // Clear any existing backup since we have no items to process + await getRedisCache().del(this.getKey('backup')); return; } this.logger.info(`Flushing ${parsedItems.length} items`); try { + // Create backup before processing + await getRedisCache().del(this.getKey('backup')); // Clear any existing backup first + await getRedisCache().lpush( + this.getKey('backup'), + ...parsedItems.map((item) => JSON.stringify(item)), + ); + const { toInsert, toKeep } = await this.processItems(parsedItems); if (toInsert.length) { @@ -203,6 +217,9 @@ export class RedisBuffer { ...parsedItems.map((item) => JSON.stringify(item)), ); } + + // Clear the backup since we're adding items back to main buffer + await getRedisCache().del(this.getKey('backup')); } } diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 126a9ac7..76f2d288 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -22,7 +22,7 @@ const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24; type BufferType = IClickhouseEvent; export class EventBuffer extends RedisBuffer { constructor() { - super(TABLE_NAMES.events, null); + super('events_v2', null); } getLastEventKey({ diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index da33e596..67b04b77 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -20,7 +20,7 @@ const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES type BufferType = IClickhouseProfile; export class ProfileBuffer extends RedisBuffer { constructor() { - super(TABLE_NAMES.profiles, BATCH_SIZE); + super('profiles', BATCH_SIZE); } // this will do a couple of things: