fix(root): add hyperdx and better logging

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-09-13 21:49:51 +02:00
committed by Carl-Gerhard Lindesvärd
parent 4bafa16419
commit c819c18962
22 changed files with 2116 additions and 332 deletions

View File

@@ -1,12 +1,6 @@
import { createLogger } from '@openpanel/logger';
import { getRedisCache } from '@openpanel/redis';
const logger = {
debug: (...args: unknown[]) => console.log('[DEBUG]', ...args),
info: (...args: unknown[]) => console.log('[INFO]', ...args),
warn: (...args: unknown[]) => console.log('[WARN]', ...args),
error: (...args: unknown[]) => console.log('[ERROR]', ...args),
};
export const DELETE = '__DELETE__';
export type QueueItem<T> = {
@@ -47,6 +41,7 @@ export abstract class RedisBuffer<T> {
public prefix = 'op:buffer';
public table: string;
public batchSize?: number;
public logger: ReturnType<typeof createLogger>;
// abstract methods
public abstract onInsert?: OnInsert<T>;
@@ -58,6 +53,9 @@ export abstract class RedisBuffer<T> {
constructor(options: { table: string; batchSize?: number }) {
this.table = options.table;
this.batchSize = options.batchSize;
this.logger = createLogger({ name: `buffer` }).child({
table: this.table,
});
}
public getKey(name?: string) {
@@ -73,12 +71,12 @@ export abstract class RedisBuffer<T> {
await getRedisCache().rpush(this.getKey(), JSON.stringify(value));
const length = await getRedisCache().llen(this.getKey());
logger.debug(
this.logger.debug(
`Inserted item into buffer ${this.table}. Current length: ${length}`
);
if (this.batchSize && length >= this.batchSize) {
logger.info(
this.logger.info(
`Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`
);
this.flush();
@@ -90,11 +88,13 @@ export abstract class RedisBuffer<T> {
const queue = await this.getQueue(this.batchSize || -1);
if (queue.length === 0) {
logger.debug(`Flush called on empty buffer ${this.table}`);
this.logger.debug(`Flush called on empty buffer ${this.table}`);
return { count: 0, data: [] };
}
logger.info(`Flushing ${queue.length} items from buffer ${this.table}`);
this.logger.info(
`Flushing ${queue.length} items from buffer ${this.table}`
);
try {
const indexes = await this.processQueue(queue);
@@ -105,18 +105,18 @@ export abstract class RedisBuffer<T> {
if (this.onCompleted) {
const res = await this.onCompleted(data);
logger.info(
this.logger.info(
`Completed processing ${res.length} items from buffer ${this.table}`
);
return { count: res.length, data: res };
}
logger.info(
this.logger.info(
`Processed ${indexes.length} items from buffer ${this.table}`
);
return { count: indexes.length, data: indexes };
} catch (e) {
logger.error(
this.logger.error(
`Failed to process queue while flushing buffer ${this.table}:`,
e
);
@@ -126,12 +126,12 @@ export abstract class RedisBuffer<T> {
data: JSON.stringify(queue.map((item) => item.event)),
retries: 0,
});
logger.warn(
this.logger.warn(
`Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`
);
}
} catch (e) {
logger.error(
this.logger.error(
`Failed to get queue while flushing buffer ${this.table}:`,
e
);
@@ -145,7 +145,9 @@ export abstract class RedisBuffer<T> {
});
multi.lrem(this.getKey(), 0, DELETE);
await multi.exec();
logger.debug(`Deleted ${indexes.length} items from buffer ${this.table}`);
this.logger.debug(
`Deleted ${indexes.length} items from buffer ${this.table}`
);
}
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
@@ -156,7 +158,9 @@ export abstract class RedisBuffer<T> {
index,
}))
.filter((item): item is QueueItem<T> => item.event !== null);
logger.debug(`Retrieved ${result.length} items from buffer ${this.table}`);
this.logger.debug(
`Retrieved ${result.length} items from buffer ${this.table}`
);
return result;
}
@@ -164,7 +168,7 @@ export abstract class RedisBuffer<T> {
try {
return JSON.parse(item);
} catch (e) {
logger.warn(`Failed to parse item in buffer ${this.table}:`, e);
this.logger.warn(`Failed to parse item in buffer ${this.table}:`, e);
return null;
}
}