chore(buffer): iron out the buffer issues
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
import { getSafeJson } from '@openpanel/common';
|
||||
import type { ILogger } from '@openpanel/logger';
|
||||
import { createLogger } from '@openpanel/logger';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import { pathOr } from 'ramda';
|
||||
|
||||
export type Find<T, R = unknown> = (
|
||||
callback: (item: T) => boolean,
|
||||
@@ -42,7 +44,9 @@ export class RedisBuffer<T> {
|
||||
await getRedisCache().rpush(this.getKey(), JSON.stringify(item));
|
||||
const bufferSize = await getRedisCache().llen(this.getKey());
|
||||
|
||||
this.logger.debug(`Item added. Current size: ${bufferSize}`);
|
||||
this.logger.debug(
|
||||
`Item added (${pathOr('unknown', ['id'], item)}) Current size: ${bufferSize}`,
|
||||
);
|
||||
|
||||
if (this.maxBufferSize && bufferSize >= this.maxBufferSize) {
|
||||
await this.tryFlush();
|
||||
@@ -66,6 +70,8 @@ export class RedisBuffer<T> {
|
||||
this.logger.debug('Lock acquired. Attempting to flush.');
|
||||
try {
|
||||
await this.flush();
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to flush buffer', { error });
|
||||
} finally {
|
||||
await this.releaseLock(lockId);
|
||||
}
|
||||
@@ -117,18 +123,26 @@ export class RedisBuffer<T> {
|
||||
.exec();
|
||||
|
||||
if (!result) {
|
||||
this.logger.error('No result from redis transaction', {
|
||||
result,
|
||||
});
|
||||
throw new Error('Redis transaction failed');
|
||||
}
|
||||
|
||||
const lrange = result[0];
|
||||
|
||||
if (!lrange || lrange[0] instanceof Error) {
|
||||
this.logger.error('Error from lrange', {
|
||||
result,
|
||||
});
|
||||
throw new Error('Redis transaction failed');
|
||||
}
|
||||
|
||||
const items = lrange[1] as string[];
|
||||
|
||||
const parsedItems = items.map((item) => JSON.parse(item) as T);
|
||||
const parsedItems = items
|
||||
.map((item) => getSafeJson<T | null>(item) as T | null)
|
||||
.filter((item): item is T => item !== null);
|
||||
|
||||
if (parsedItems.length === 0) {
|
||||
this.logger.debug('No items to flush');
|
||||
@@ -163,15 +177,15 @@ export class RedisBuffer<T> {
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to process queue while flushing buffer', {
|
||||
error,
|
||||
queueSize: items.length,
|
||||
queueSize: parsedItems.length,
|
||||
});
|
||||
|
||||
if (items.length > 0) {
|
||||
if (parsedItems.length > 0) {
|
||||
// Add back items to keep
|
||||
this.logger.info('Adding all items back to buffer');
|
||||
await getRedisCache().lpush(
|
||||
this.getKey(),
|
||||
...items.map((item) => JSON.stringify(item)),
|
||||
...parsedItems.map((item) => JSON.stringify(item)),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -190,8 +204,15 @@ export class RedisBuffer<T> {
|
||||
}
|
||||
|
||||
protected async getQueue(count?: number): Promise<T[]> {
|
||||
const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1);
|
||||
return items.map((item) => JSON.parse(item) as T);
|
||||
try {
|
||||
const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1);
|
||||
return items
|
||||
.map((item) => getSafeJson<T | null>(item) as T | null)
|
||||
.filter((item): item is T => item !== null);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get queue', { error });
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
protected processItems(items: T[]): Promise<{ toInsert: T[]; toKeep: T[] }> {
|
||||
|
||||
@@ -4,7 +4,11 @@ import SuperJSON from 'superjson';
|
||||
import { deepMergeObjects } from '@openpanel/common';
|
||||
import { getRedisCache, getRedisPub } from '@openpanel/redis';
|
||||
|
||||
import { TABLE_NAMES, ch } from '../clickhouse-client';
|
||||
import {
|
||||
TABLE_NAMES,
|
||||
ch,
|
||||
convertClickhouseDateToJs,
|
||||
} from '../clickhouse-client';
|
||||
import { transformEvent } from '../services/event.service';
|
||||
import type {
|
||||
IClickhouseEvent,
|
||||
@@ -13,6 +17,8 @@ import type {
|
||||
import type { Find, FindMany } from './buffer';
|
||||
import { RedisBuffer } from './buffer';
|
||||
|
||||
const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24;
|
||||
|
||||
type BufferType = IClickhouseEvent;
|
||||
export class EventBuffer extends RedisBuffer<BufferType> {
|
||||
constructor() {
|
||||
@@ -46,8 +52,8 @@ export class EventBuffer extends RedisBuffer<BufferType> {
|
||||
protected async processItems(
|
||||
queue: BufferType[],
|
||||
): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> {
|
||||
const toInsert = new Set<BufferType>();
|
||||
const itemsToStalled = new Set<BufferType>();
|
||||
const toInsert: BufferType[] = [];
|
||||
const toStalled: BufferType[] = [];
|
||||
|
||||
// Sort data by created_at
|
||||
// oldest first
|
||||
@@ -85,7 +91,7 @@ export class EventBuffer extends RedisBuffer<BufferType> {
|
||||
event.properties.__properties_from = lastEventWithData.id;
|
||||
}
|
||||
|
||||
return toInsert.add(event);
|
||||
return toInsert.push(event);
|
||||
});
|
||||
|
||||
// Group screen_view events by session_id
|
||||
@@ -125,10 +131,10 @@ export class EventBuffer extends RedisBuffer<BufferType> {
|
||||
},
|
||||
duration,
|
||||
};
|
||||
toInsert.add(event);
|
||||
toInsert.push(event);
|
||||
} else if (hasSessionEnd) {
|
||||
// push last event in session if we have a session_end event
|
||||
toInsert.add(item);
|
||||
toInsert.push(item);
|
||||
}
|
||||
});
|
||||
} // for of end
|
||||
@@ -137,28 +143,37 @@ export class EventBuffer extends RedisBuffer<BufferType> {
|
||||
// This should not theoretically happen but if it does we should move them to stalled
|
||||
queue.forEach((item) => {
|
||||
if (
|
||||
!toInsert.has(item) &&
|
||||
new Date(item.created_at).getTime() <
|
||||
new Date().getTime() - 1000 * 60 * 60 * 24
|
||||
!toInsert.find((i) => i.id === item.id) &&
|
||||
convertClickhouseDateToJs(item.created_at).getTime() <
|
||||
new Date().getTime() - STALLED_QUEUE_TIMEOUT
|
||||
) {
|
||||
itemsToStalled.add(item);
|
||||
toStalled.push(item);
|
||||
}
|
||||
});
|
||||
|
||||
if (itemsToStalled.size > 0) {
|
||||
const multi = getRedisCache().multi();
|
||||
for (const item of itemsToStalled) {
|
||||
multi.rpush(this.getKey('stalled'), JSON.stringify(item));
|
||||
if (toStalled.length > 0) {
|
||||
try {
|
||||
this.logger.info(`Pushing to stalled queue (${toStalled.length})`, {
|
||||
items: toStalled,
|
||||
count: toStalled.length,
|
||||
});
|
||||
await getRedisCache().rpush(
|
||||
this.getKey('stalled'),
|
||||
...toStalled.map((item) => JSON.stringify(item)),
|
||||
);
|
||||
} catch (error) {
|
||||
toStalled.length = 0;
|
||||
this.logger.error('Failed to push to stalled queue', { error });
|
||||
}
|
||||
await multi.exec();
|
||||
}
|
||||
|
||||
const toInsertArray = Array.from(toInsert);
|
||||
return {
|
||||
toInsert: toInsertArray,
|
||||
toKeep: queue.filter(
|
||||
(item) => !toInsertArray.find((i) => i.id === item.id),
|
||||
),
|
||||
toInsert,
|
||||
toKeep: queue.filter((item) => {
|
||||
const willBeInserted = toInsert.find((i) => i.id === item.id);
|
||||
const willBeStalled = toStalled.find((i) => i.id === item.id);
|
||||
return willBeInserted === undefined && willBeStalled === undefined;
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user