fix(buffer): keep a backup of the buffer if something goes wrong
This commit is contained in:
@@ -119,6 +119,7 @@ export class RedisBuffer<T> {
|
||||
const result = await getRedisCache()
|
||||
.multi()
|
||||
.lrange(this.getKey(), 0, -1)
|
||||
.lrange(this.getKey('backup'), 0, -1)
|
||||
.del(this.getKey())
|
||||
.exec();
|
||||
|
||||
@@ -128,8 +129,9 @@ export class RedisBuffer<T> {
|
||||
});
|
||||
throw new Error('Redis transaction failed');
|
||||
}
|
||||
|
||||
|
||||
const lrange = result[0];
|
||||
const lrangePrevious = result[1];
|
||||
|
||||
if (!lrange || lrange[0] instanceof Error) {
|
||||
this.logger.error('Error from lrange', {
|
||||
@@ -139,11 +141,21 @@ export class RedisBuffer<T> {
|
||||
}
|
||||
|
||||
const items = lrange[1] as string[];
|
||||
if (lrangePrevious && lrangePrevious[0] === null && Array.isArray(lrangePrevious[1])) {
|
||||
items.push(...(lrangePrevious[1] as string[]));
|
||||
}
|
||||
|
||||
const parsedItems = items
|
||||
.map((item) => getSafeJson<T | null>(item) as T | null)
|
||||
.filter((item): item is T => item !== null);
|
||||
|
||||
if (parsedItems.length > 0) {
|
||||
await getRedisCache().lpush(
|
||||
this.getKey('backup'),
|
||||
...parsedItems.map((item) => JSON.stringify(item)),
|
||||
);
|
||||
}
|
||||
|
||||
if (parsedItems.length === 0) {
|
||||
this.logger.debug('No items to flush');
|
||||
return;
|
||||
@@ -167,6 +179,9 @@ export class RedisBuffer<T> {
|
||||
);
|
||||
}
|
||||
|
||||
// Clear backup
|
||||
await getRedisCache().del(this.getKey('backup'));
|
||||
|
||||
this.logger.info(
|
||||
`Inserted ${toInsert.length} items into DB, kept ${toKeep.length} items in buffer`,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user