fix(buffer): max call stack issue with buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-11-29 10:39:12 +01:00
parent 0dfd28c9ce
commit d80754a6fd
4 changed files with 30 additions and 13 deletions

View File

@@ -5,7 +5,7 @@ import { RedisBuffer } from './buffer';
type BufferType = IClickhouseBotEvent;
export class BotBuffer extends RedisBuffer<BufferType> {
constructor() {
super(TABLE_NAMES.events_bots, 500);
super('events_bots', 500);
}
protected async insertIntoDB(items: BufferType[]): Promise<void> {

View File

@@ -20,6 +20,7 @@ export class RedisBuffer<T> {
private lockKey: string;
protected maxBufferSize: number | null;
protected logger: ILogger;
private isCurrentlyFlushing = false;
constructor(bufferName: string, maxBufferSize: number | null) {
this.bufferKey = bufferName;
@@ -57,6 +58,11 @@ export class RedisBuffer<T> {
}
public async tryFlush(): Promise<void> {
if (this.isCurrentlyFlushing) {
this.logger.debug('Already flushing. Skipping additional flush attempt.');
return;
}
const lockId = uuidv4();
const acquired = await getRedisCache().set(
this.lockKey,
@@ -68,15 +74,17 @@ export class RedisBuffer<T> {
if (acquired === 'OK') {
this.logger.debug('Lock acquired. Attempting to flush.');
this.isCurrentlyFlushing = true;
try {
await this.flush();
} catch (error) {
this.logger.error('Failed to flush buffer', { error });
} finally {
this.isCurrentlyFlushing = false;
await this.releaseLock(lockId);
}
} else {
this.logger.debug('Failed to acquire lock for. Skipping flush.');
this.logger.debug('Failed to acquire lock. Skipping flush.');
}
}
@@ -129,7 +137,7 @@ export class RedisBuffer<T> {
});
throw new Error('Redis transaction failed');
}
const lrange = result[0];
const lrangePrevious = result[1];
@@ -141,7 +149,11 @@ export class RedisBuffer<T> {
}
const items = lrange[1] as string[];
if (lrangePrevious && lrangePrevious[0] === null && Array.isArray(lrangePrevious[1])) {
if (
lrangePrevious &&
lrangePrevious[0] === null &&
Array.isArray(lrangePrevious[1])
) {
items.push(...(lrangePrevious[1] as string[]));
}
@@ -149,21 +161,23 @@ export class RedisBuffer<T> {
.map((item) => getSafeJson<T | null>(item) as T | null)
.filter((item): item is T => item !== null);
if (parsedItems.length > 0) {
await getRedisCache().lpush(
this.getKey('backup'),
...parsedItems.map((item) => JSON.stringify(item)),
);
}
if (parsedItems.length === 0) {
this.logger.debug('No items to flush');
// Clear any existing backup since we have no items to process
await getRedisCache().del(this.getKey('backup'));
return;
}
this.logger.info(`Flushing ${parsedItems.length} items`);
try {
// Create backup before processing
await getRedisCache().del(this.getKey('backup')); // Clear any existing backup first
await getRedisCache().lpush(
this.getKey('backup'),
...parsedItems.map((item) => JSON.stringify(item)),
);
const { toInsert, toKeep } = await this.processItems(parsedItems);
if (toInsert.length) {
@@ -203,6 +217,9 @@ export class RedisBuffer<T> {
...parsedItems.map((item) => JSON.stringify(item)),
);
}
// Clear the backup since we're adding items back to main buffer
await getRedisCache().del(this.getKey('backup'));
}
}

View File

@@ -22,7 +22,7 @@ const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24;
type BufferType = IClickhouseEvent;
export class EventBuffer extends RedisBuffer<BufferType> {
constructor() {
super(TABLE_NAMES.events, null);
super('events_v2', null);
}
getLastEventKey({

View File

@@ -20,7 +20,7 @@ const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES
type BufferType = IClickhouseProfile;
export class ProfileBuffer extends RedisBuffer<BufferType> {
constructor() {
super(TABLE_NAMES.profiles, BATCH_SIZE);
super('profiles', BATCH_SIZE);
}
// this will do a couple of things: