perf(api): add bot events with buffer
This commit is contained in:
@@ -54,6 +54,7 @@ const startServer = async () => {
|
||||
const fastify = Fastify({
|
||||
maxParamLength: 15_000,
|
||||
bodyLimit: 1048576 * 500, // 500MB
|
||||
logger,
|
||||
});
|
||||
|
||||
fastify.register(compress, {
|
||||
|
||||
@@ -67,7 +67,7 @@ queues.forEach((queue) => {
|
||||
});
|
||||
|
||||
// Buffer
|
||||
const buffers = ['events_v2', 'profiles'];
|
||||
const buffers = ['events_v2', 'profiles', 'events_bots'];
|
||||
|
||||
buffers.forEach((buffer) => {
|
||||
register.registerMetric(
|
||||
|
||||
39
packages/db/src/buffers/bot-buffer.ts
Normal file
39
packages/db/src/buffers/bot-buffer.ts
Normal file
@@ -0,0 +1,39 @@
|
||||
import { ch, TABLE_NAMES } from '../clickhouse-client';
|
||||
import type { IClickhouseBotEvent } from '../services/event.service';
|
||||
import type {
|
||||
Find,
|
||||
FindMany,
|
||||
OnCompleted,
|
||||
OnInsert,
|
||||
ProcessQueue,
|
||||
} from './buffer';
|
||||
import { RedisBuffer } from './buffer';
|
||||
|
||||
export class BotBuffer extends RedisBuffer<IClickhouseBotEvent> {
|
||||
constructor() {
|
||||
super({
|
||||
table: TABLE_NAMES.events_bots,
|
||||
batchSize: 100,
|
||||
});
|
||||
}
|
||||
|
||||
public onInsert?: OnInsert<IClickhouseBotEvent> | undefined;
|
||||
public onCompleted?: OnCompleted<IClickhouseBotEvent> | undefined;
|
||||
|
||||
public processQueue: ProcessQueue<IClickhouseBotEvent> = async (queue) => {
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.events_bots,
|
||||
values: queue.map((item) => item.event),
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
return queue.map((item) => item.index);
|
||||
};
|
||||
|
||||
public findMany: FindMany<IClickhouseBotEvent, IClickhouseBotEvent> = () => {
|
||||
return Promise.resolve([]);
|
||||
};
|
||||
|
||||
public find: Find<IClickhouseBotEvent, IClickhouseBotEvent> = () => {
|
||||
return Promise.resolve(null);
|
||||
};
|
||||
}
|
||||
@@ -1,5 +1,12 @@
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
|
||||
const logger = {
|
||||
debug: (...args: unknown[]) => console.log('[DEBUG]', ...args),
|
||||
info: (...args: unknown[]) => console.log('[INFO]', ...args),
|
||||
warn: (...args: unknown[]) => console.log('[WARN]', ...args),
|
||||
error: (...args: unknown[]) => console.log('[ERROR]', ...args),
|
||||
};
|
||||
|
||||
export const DELETE = '__DELETE__';
|
||||
|
||||
export type QueueItem<T> = {
|
||||
@@ -66,7 +73,14 @@ export abstract class RedisBuffer<T> {
|
||||
await getRedisCache().rpush(this.getKey(), JSON.stringify(value));
|
||||
|
||||
const length = await getRedisCache().llen(this.getKey());
|
||||
logger.debug(
|
||||
`Inserted item into buffer ${this.table}. Current length: ${length}`
|
||||
);
|
||||
|
||||
if (this.batchSize && length >= this.batchSize) {
|
||||
logger.info(
|
||||
`Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`
|
||||
);
|
||||
this.flush();
|
||||
}
|
||||
}
|
||||
@@ -76,12 +90,12 @@ export abstract class RedisBuffer<T> {
|
||||
const queue = await this.getQueue(this.batchSize || -1);
|
||||
|
||||
if (queue.length === 0) {
|
||||
return {
|
||||
count: 0,
|
||||
data: [],
|
||||
};
|
||||
logger.debug(`Flush called on empty buffer ${this.table}`);
|
||||
return { count: 0, data: [] };
|
||||
}
|
||||
|
||||
logger.info(`Flushing ${queue.length} items from buffer ${this.table}`);
|
||||
|
||||
try {
|
||||
const indexes = await this.processQueue(queue);
|
||||
await this.deleteIndexes(indexes);
|
||||
@@ -91,19 +105,19 @@ export abstract class RedisBuffer<T> {
|
||||
|
||||
if (this.onCompleted) {
|
||||
const res = await this.onCompleted(data);
|
||||
return {
|
||||
count: res.length,
|
||||
data: res,
|
||||
};
|
||||
logger.info(
|
||||
`Completed processing ${res.length} items from buffer ${this.table}`
|
||||
);
|
||||
return { count: res.length, data: res };
|
||||
}
|
||||
|
||||
return {
|
||||
count: indexes.length,
|
||||
data: indexes,
|
||||
};
|
||||
logger.info(
|
||||
`Processed ${indexes.length} items from buffer ${this.table}`
|
||||
);
|
||||
return { count: indexes.length, data: indexes };
|
||||
} catch (e) {
|
||||
console.log(
|
||||
`[${this.getKey()}] Failed to processQueue while flushing:`,
|
||||
logger.error(
|
||||
`Failed to process queue while flushing buffer ${this.table}:`,
|
||||
e
|
||||
);
|
||||
const timestamp = new Date().getTime();
|
||||
@@ -112,9 +126,15 @@ export abstract class RedisBuffer<T> {
|
||||
data: JSON.stringify(queue.map((item) => item.event)),
|
||||
retries: 0,
|
||||
});
|
||||
logger.warn(
|
||||
`Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
console.log(`[${this.getKey()}] Failed to getQueue while flushing:`, e);
|
||||
logger.error(
|
||||
`Failed to get queue while flushing buffer ${this.table}:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,22 +145,26 @@ export abstract class RedisBuffer<T> {
|
||||
});
|
||||
multi.lrem(this.getKey(), 0, DELETE);
|
||||
await multi.exec();
|
||||
logger.debug(`Deleted ${indexes.length} items from buffer ${this.table}`);
|
||||
}
|
||||
|
||||
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
|
||||
const queue = await getRedisCache().lrange(this.getKey(), 0, limit);
|
||||
return queue
|
||||
const result = queue
|
||||
.map((item, index) => ({
|
||||
event: this.transformQueueItem(item),
|
||||
index,
|
||||
}))
|
||||
.filter((item): item is QueueItem<T> => item.event !== null);
|
||||
logger.debug(`Retrieved ${result.length} items from buffer ${this.table}`);
|
||||
return result;
|
||||
}
|
||||
|
||||
private transformQueueItem(item: string): T | null {
|
||||
try {
|
||||
return JSON.parse(item);
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to parse item in buffer ${this.table}:`, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { BotBuffer } from './bot-buffer';
|
||||
import { EventBuffer } from './event-buffer';
|
||||
import { ProfileBuffer } from './profile-buffer';
|
||||
|
||||
export const eventBuffer = new EventBuffer();
|
||||
export const profileBuffer = new ProfileBuffer();
|
||||
export const botBuffer = new BotBuffer();
|
||||
|
||||
@@ -3,7 +3,7 @@ import { mergeDeepRight } from 'ramda';
|
||||
import { toDots } from '@openpanel/common';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
|
||||
import { ch, chQuery } from '../clickhouse-client';
|
||||
import { ch, chQuery, TABLE_NAMES } from '../clickhouse-client';
|
||||
import type {
|
||||
IClickhouseProfile,
|
||||
IServiceProfile,
|
||||
@@ -22,7 +22,7 @@ import { RedisBuffer } from './buffer';
|
||||
export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
|
||||
constructor() {
|
||||
super({
|
||||
table: 'profiles',
|
||||
table: TABLE_NAMES.profiles,
|
||||
batchSize: 100,
|
||||
});
|
||||
}
|
||||
@@ -56,7 +56,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
|
||||
);
|
||||
|
||||
await ch.insert({
|
||||
table: 'profiles',
|
||||
table: TABLE_NAMES.profiles,
|
||||
values: cleanedQueue.map((item) => {
|
||||
const profile = profiles.find(
|
||||
(p) =>
|
||||
|
||||
@@ -6,7 +6,7 @@ import { toDots } from '@openpanel/common';
|
||||
import { cacheable, getRedisCache } from '@openpanel/redis';
|
||||
import type { IChartEventFilter } from '@openpanel/validation';
|
||||
|
||||
import { eventBuffer } from '../buffers';
|
||||
import { botBuffer, eventBuffer } from '../buffers';
|
||||
import {
|
||||
ch,
|
||||
chQuery,
|
||||
@@ -37,6 +37,26 @@ export type IServicePage = {
|
||||
origin: string;
|
||||
};
|
||||
|
||||
export interface IClickhouseBotEvent {
|
||||
id: string;
|
||||
name: string;
|
||||
type: string;
|
||||
project_id: string;
|
||||
path: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface IServiceBotEvent {
|
||||
id: string;
|
||||
name: string;
|
||||
type: string;
|
||||
projectId: string;
|
||||
path: string;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export type IServiceCreateBotEventPayload = Omit<IServiceBotEvent, 'id'>;
|
||||
|
||||
export interface IClickhouseEvent {
|
||||
id: string;
|
||||
name: string;
|
||||
@@ -535,33 +555,20 @@ export async function getEventsCount({
|
||||
return res[0]?.count ?? 0;
|
||||
}
|
||||
|
||||
interface CreateBotEventPayload {
|
||||
name: string;
|
||||
type: string;
|
||||
path: string;
|
||||
projectId: string;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export function createBotEvent({
|
||||
name,
|
||||
type,
|
||||
projectId,
|
||||
createdAt,
|
||||
path,
|
||||
}: CreateBotEventPayload) {
|
||||
return ch.insert({
|
||||
table: 'events_bots',
|
||||
format: 'JSONEachRow',
|
||||
values: [
|
||||
{
|
||||
name,
|
||||
type,
|
||||
project_id: projectId,
|
||||
path,
|
||||
created_at: formatClickhouseDate(createdAt),
|
||||
},
|
||||
],
|
||||
}: IServiceCreateBotEventPayload) {
|
||||
return botBuffer.insert({
|
||||
id: uuid(),
|
||||
name,
|
||||
type,
|
||||
project_id: projectId,
|
||||
path,
|
||||
created_at: formatClickhouseDate(createdAt),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user