From 3e7a3ea6c97fef70bcb30ca8c1c9166c707857b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 19 Nov 2024 20:40:50 +0100 Subject: [PATCH] fix(buffer): keep a backup of the buffer if something goes wrong --- packages/db/src/buffers/buffer.ts | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 2de11423..6ed1c6bc 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -119,6 +119,7 @@ export class RedisBuffer { const result = await getRedisCache() .multi() .lrange(this.getKey(), 0, -1) + .lrange(this.getKey('backup'), 0, -1) .del(this.getKey()) .exec(); @@ -128,8 +129,9 @@ export class RedisBuffer { }); throw new Error('Redis transaction failed'); } - + const lrange = result[0]; + const lrangePrevious = result[1]; if (!lrange || lrange[0] instanceof Error) { this.logger.error('Error from lrange', { @@ -139,11 +141,21 @@ export class RedisBuffer { } const items = lrange[1] as string[]; + if (lrangePrevious && lrangePrevious[0] === null && Array.isArray(lrangePrevious[1])) { + items.push(...(lrangePrevious[1] as string[])); + } const parsedItems = items .map((item) => getSafeJson(item) as T | null) .filter((item): item is T => item !== null); + if (parsedItems.length > 0) { + await getRedisCache().lpush( + this.getKey('backup'), + ...parsedItems.map((item) => JSON.stringify(item)), + ); + } + if (parsedItems.length === 0) { this.logger.debug('No items to flush'); return; @@ -167,6 +179,9 @@ export class RedisBuffer { ); } + // Clear backup + await getRedisCache().del(this.getKey('backup')); + this.logger.info( `Inserted ${toInsert.length} items into DB, kept ${toKeep.length} items in buffer`, {