From 46fe17e558fab0429d5184cdd3f4f902e7a3aba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 12 Sep 2024 20:51:54 +0200 Subject: [PATCH] perf(api): add bot events with buffer --- apps/api/src/index.ts | 1 + apps/worker/src/metrics.ts | 2 +- packages/db/src/buffers/bot-buffer.ts | 39 ++++++++++++++++ packages/db/src/buffers/buffer.ts | 56 ++++++++++++++++------- packages/db/src/buffers/index.ts | 2 + packages/db/src/buffers/profile-buffer.ts | 6 +-- packages/db/src/services/event.service.ts | 51 ++++++++++++--------- 7 files changed, 115 insertions(+), 42 deletions(-) create mode 100644 packages/db/src/buffers/bot-buffer.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cc39ee9c..3d77e99b 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -54,6 +54,7 @@ const startServer = async () => { const fastify = Fastify({ maxParamLength: 15_000, bodyLimit: 1048576 * 500, // 500MB + logger, }); fastify.register(compress, { diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index 6d0af53a..c667e99a 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -67,7 +67,7 @@ queues.forEach((queue) => { }); // Buffer -const buffers = ['events_v2', 'profiles']; +const buffers = ['events_v2', 'profiles', 'events_bots']; buffers.forEach((buffer) => { register.registerMetric( diff --git a/packages/db/src/buffers/bot-buffer.ts b/packages/db/src/buffers/bot-buffer.ts new file mode 100644 index 00000000..48b1f678 --- /dev/null +++ b/packages/db/src/buffers/bot-buffer.ts @@ -0,0 +1,39 @@ +import { ch, TABLE_NAMES } from '../clickhouse-client'; +import type { IClickhouseBotEvent } from '../services/event.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +export class BotBuffer extends RedisBuffer { + constructor() { + super({ + table: TABLE_NAMES.events_bots, + batchSize: 100, + }); + } + + public onInsert?: OnInsert | undefined; + public onCompleted?: OnCompleted | undefined; + + public processQueue: ProcessQueue = async (queue) => { + await ch.insert({ + table: TABLE_NAMES.events_bots, + values: queue.map((item) => item.event), + format: 'JSONEachRow', + }); + return queue.map((item) => item.index); + }; + + public findMany: FindMany = () => { + return Promise.resolve([]); + }; + + public find: Find = () => { + return Promise.resolve(null); + }; +} diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 1ec48a10..9554f2d7 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -1,5 +1,12 @@ import { getRedisCache } from '@openpanel/redis'; +const logger = { + debug: (...args: unknown[]) => console.log('[DEBUG]', ...args), + info: (...args: unknown[]) => console.log('[INFO]', ...args), + warn: (...args: unknown[]) => console.log('[WARN]', ...args), + error: (...args: unknown[]) => console.log('[ERROR]', ...args), +}; + export const DELETE = '__DELETE__'; export type QueueItem = { @@ -66,7 +73,14 @@ export abstract class RedisBuffer { await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); const length = await getRedisCache().llen(this.getKey()); + logger.debug( + `Inserted item into buffer ${this.table}. Current length: ${length}` + ); + if (this.batchSize && length >= this.batchSize) { + logger.info( + `Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...` + ); this.flush(); } } @@ -76,12 +90,12 @@ export abstract class RedisBuffer { const queue = await this.getQueue(this.batchSize || -1); if (queue.length === 0) { - return { - count: 0, - data: [], - }; + logger.debug(`Flush called on empty buffer ${this.table}`); + return { count: 0, data: [] }; } + logger.info(`Flushing ${queue.length} items from buffer ${this.table}`); + try { const indexes = await this.processQueue(queue); await this.deleteIndexes(indexes); @@ -91,19 +105,19 @@ export abstract class RedisBuffer { if (this.onCompleted) { const res = await this.onCompleted(data); - return { - count: res.length, - data: res, - }; + logger.info( + `Completed processing ${res.length} items from buffer ${this.table}` + ); + return { count: res.length, data: res }; } - return { - count: indexes.length, - data: indexes, - }; + logger.info( + `Processed ${indexes.length} items from buffer ${this.table}` + ); + return { count: indexes.length, data: indexes }; } catch (e) { - console.log( - `[${this.getKey()}] Failed to processQueue while flushing:`, + logger.error( + `Failed to process queue while flushing buffer ${this.table}:`, e ); const timestamp = new Date().getTime(); @@ -112,9 +126,15 @@ export abstract class RedisBuffer { data: JSON.stringify(queue.map((item) => item.event)), retries: 0, }); + logger.warn( + `Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}` + ); } } catch (e) { - console.log(`[${this.getKey()}] Failed to getQueue while flushing:`, e); + logger.error( + `Failed to get queue while flushing buffer ${this.table}:`, + e + ); } } @@ -125,22 +145,26 @@ export abstract class RedisBuffer { }); multi.lrem(this.getKey(), 0, DELETE); await multi.exec(); + logger.debug(`Deleted ${indexes.length} items from buffer ${this.table}`); } public async getQueue(limit: number): Promise[]> { const queue = await getRedisCache().lrange(this.getKey(), 0, limit); - return queue + const result = queue .map((item, index) => ({ event: this.transformQueueItem(item), index, })) .filter((item): item is QueueItem => item.event !== null); + logger.debug(`Retrieved ${result.length} items from buffer ${this.table}`); + return result; } private transformQueueItem(item: string): T | null { try { return JSON.parse(item); } catch (e) { + logger.warn(`Failed to parse item in buffer ${this.table}:`, e); return null; } } diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 7623e941..69909f09 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,5 +1,7 @@ +import { BotBuffer } from './bot-buffer'; import { EventBuffer } from './event-buffer'; import { ProfileBuffer } from './profile-buffer'; export const eventBuffer = new EventBuffer(); export const profileBuffer = new ProfileBuffer(); +export const botBuffer = new BotBuffer(); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 763235fd..8c05f065 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -3,7 +3,7 @@ import { mergeDeepRight } from 'ramda'; import { toDots } from '@openpanel/common'; import { getRedisCache } from '@openpanel/redis'; -import { ch, chQuery } from '../clickhouse-client'; +import { ch, chQuery, TABLE_NAMES } from '../clickhouse-client'; import type { IClickhouseProfile, IServiceProfile, @@ -22,7 +22,7 @@ import { RedisBuffer } from './buffer'; export class ProfileBuffer extends RedisBuffer { constructor() { super({ - table: 'profiles', + table: TABLE_NAMES.profiles, batchSize: 100, }); } @@ -56,7 +56,7 @@ export class ProfileBuffer extends RedisBuffer { ); await ch.insert({ - table: 'profiles', + table: TABLE_NAMES.profiles, values: cleanedQueue.map((item) => { const profile = profiles.find( (p) => diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 1cdb276b..428a05ca 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -6,7 +6,7 @@ import { toDots } from '@openpanel/common'; import { cacheable, getRedisCache } from '@openpanel/redis'; import type { IChartEventFilter } from '@openpanel/validation'; -import { eventBuffer } from '../buffers'; +import { botBuffer, eventBuffer } from '../buffers'; import { ch, chQuery, @@ -37,6 +37,26 @@ export type IServicePage = { origin: string; }; +export interface IClickhouseBotEvent { + id: string; + name: string; + type: string; + project_id: string; + path: string; + created_at: string; +} + +export interface IServiceBotEvent { + id: string; + name: string; + type: string; + projectId: string; + path: string; + createdAt: Date; +} + +export type IServiceCreateBotEventPayload = Omit; + export interface IClickhouseEvent { id: string; name: string; @@ -535,33 +555,20 @@ export async function getEventsCount({ return res[0]?.count ?? 0; } -interface CreateBotEventPayload { - name: string; - type: string; - path: string; - projectId: string; - createdAt: Date; -} - export function createBotEvent({ name, type, projectId, createdAt, path, -}: CreateBotEventPayload) { - return ch.insert({ - table: 'events_bots', - format: 'JSONEachRow', - values: [ - { - name, - type, - project_id: projectId, - path, - created_at: formatClickhouseDate(createdAt), - }, - ], +}: IServiceCreateBotEventPayload) { + return botBuffer.insert({ + id: uuid(), + name, + type, + project_id: projectId, + path, + created_at: formatClickhouseDate(createdAt), }); }