From be83b484bc090a0bff19c4c139ae10f3c1b17d22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 3 Feb 2025 21:50:35 +0100 Subject: [PATCH] improve(buffer): better clean up --- apps/worker/src/metrics.ts | 70 ++++++++++++------- packages/db/src/buffers/bot-buffer-psql.ts | 12 ++-- packages/db/src/buffers/buffer.ts | 2 + packages/db/src/buffers/event-buffer-psql.ts | 36 ++++++---- .../db/src/buffers/profile-buffer-psql.ts | 69 ++++++++++++++---- 5 files changed, 129 insertions(+), 60 deletions(-) diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index cd7a0b75..9989c5c4 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -1,7 +1,7 @@ import client from 'prom-client'; +import { botBuffer, db, eventBuffer, profileBuffer } from '@openpanel/db'; import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; -import { getRedisCache } from '@openpanel/redis'; const Registry = client.Registry; @@ -66,31 +66,47 @@ queues.forEach((queue) => { ); }); -// Buffer -const buffers = ['events_v2', 'profiles', 'events_bots']; +register.registerMetric( + new client.Gauge({ + name: `buffer_${eventBuffer.name}_count`, + help: 'Number of unprocessed events', + async collect() { + const metric = await db.eventBuffer.count({ + where: { + processedAt: null, + }, + }); + this.set(metric); + }, + }), +); -buffers.forEach((buffer) => { - register.registerMetric( - new client.Gauge({ - name: `buffer_${buffer}_count`, - help: 'Number of users in the users array', - async collect() { - const metric = await getRedisCache().llen(`op:buffer:${buffer}`); - this.set(metric); - }, - }), - ); +register.registerMetric( + new client.Gauge({ + name: `buffer_${profileBuffer.name}_count`, + help: 'Number of unprocessed profiles', + async collect() { + const metric = await db.profileBuffer.count({ + where: { + processedAt: null, + }, + }); + this.set(metric); + }, + }), +); - register.registerMetric( - new client.Gauge({ - name: `buffer_${buffer}_stalled_count`, - help: 'Number of users in the users array', - async collect() { - const metric = await getRedisCache().llen( - `op:buffer:${buffer}:stalled`, - ); - this.set(metric); - }, - }), - ); -}); +register.registerMetric( + new client.Gauge({ + name: `buffer_${botBuffer.name}_count`, + help: 'Number of unprocessed bot events', + async collect() { + const metric = await db.botEventBuffer.count({ + where: { + processedAt: null, + }, + }); + this.set(metric); + }, + }), +); diff --git a/packages/db/src/buffers/bot-buffer-psql.ts b/packages/db/src/buffers/bot-buffer-psql.ts index 0f153ece..1449528b 100644 --- a/packages/db/src/buffers/bot-buffer-psql.ts +++ b/packages/db/src/buffers/bot-buffer-psql.ts @@ -9,8 +9,9 @@ import type { IClickhouseBotEvent } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; export class BotBuffer extends BaseBuffer { - private daysToKeep = 1; - private batchSize = 500; + private batchSize = process.env.BOT_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.BOT_BUFFER_BATCH_SIZE, 10) + : 1000; constructor() { super({ @@ -87,7 +88,7 @@ export class BotBuffer extends BaseBuffer { async tryCleanup() { try { await runEvery({ - interval: 1000 * 60 * 60 * 24, + interval: 60 * 5, // 5 minutes fn: this.cleanup.bind(this), key: `${this.name}-cleanup`, }); @@ -97,13 +98,10 @@ export class BotBuffer extends BaseBuffer { } async cleanup() { - const thirtyDaysAgo = new Date(); - thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep); - const deleted = await db.botEventBuffer.deleteMany({ where: { processedAt: { - lt: thirtyDaysAgo, + not: null, }, }, }); diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 6626b98a..1010c2d4 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -13,6 +13,7 @@ export type FindMany = ( ) => Promise; export class RedisBuffer { + public name: string; protected prefix = 'op:buffer'; protected bufferKey: string; private lockKey: string; @@ -20,6 +21,7 @@ export class RedisBuffer { protected logger: ILogger; constructor(bufferName: string, maxBufferSize: number | null) { + this.name = bufferName; this.bufferKey = bufferName; this.lockKey = `lock:${bufferName}`; this.maxBufferSize = maxBufferSize; diff --git a/packages/db/src/buffers/event-buffer-psql.ts b/packages/db/src/buffers/event-buffer-psql.ts index 0a94a409..5d48953d 100644 --- a/packages/db/src/buffers/event-buffer-psql.ts +++ b/packages/db/src/buffers/event-buffer-psql.ts @@ -13,7 +13,9 @@ import { import { BaseBuffer } from './base-buffer'; export class EventBuffer extends BaseBuffer { - private daysToKeep = 3; + private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP + ? Number.parseInt(process.env.EVENT_BUFFER_DAYS_TO_KEEP, 10) + : 3; private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 2000; @@ -26,7 +28,7 @@ export class EventBuffer extends BaseBuffer { name: 'event', onFlush: async () => { await this.processBuffer(); - await this.cleanup(); + await this.tryCleanup(); }, }); } @@ -205,7 +207,7 @@ export class EventBuffer extends BaseBuffer { async tryCleanup() { try { await runEvery({ - interval: 1000 * 60 * 60 * 24, + interval: 60 * 5, // 5 minutes fn: this.cleanup.bind(this), key: `${this.name}-cleanup`, }); @@ -215,18 +217,26 @@ export class EventBuffer extends BaseBuffer { } async cleanup() { - const thirtyDaysAgo = new Date(); - thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep); + const olderThan = new Date(); + olderThan.setDate(olderThan.getDate() - this.daysToKeep); - const deleted = await db.eventBuffer.deleteMany({ - where: { - processedAt: { - lt: thirtyDaysAgo, - }, - }, - }); + const deleted = await db.$executeRaw` + DELETE FROM event_buffer + WHERE + -- 1) if the event has been processed + -- and session is completed or has no session + ( + "processedAt" IS NOT NULL + AND ( + "sessionId" IN (SELECT "sessionId" FROM event_buffer WHERE name = 'session_end') + OR "sessionId" IS NULL + ) + ) + -- 2) if the event is stalled for X days + OR "createdAt" < ${olderThan} + `; - this.logger.info('Cleaned up old events', { deleted: deleted.count }); + this.logger.info('Cleaned up old events', { deleted }); } public async getLastScreenView({ diff --git a/packages/db/src/buffers/profile-buffer-psql.ts b/packages/db/src/buffers/profile-buffer-psql.ts index 0747d9ea..c56483da 100644 --- a/packages/db/src/buffers/profile-buffer-psql.ts +++ b/packages/db/src/buffers/profile-buffer-psql.ts @@ -10,12 +10,14 @@ import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; export class ProfileBuffer extends BaseBuffer { - private daysToKeep = 30; - private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE - ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP + ? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10) + : 7; + private batchSize = process.env.PROFILE_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) : 2000; - private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE - ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) : 1000; constructor() { @@ -28,9 +30,50 @@ export class ProfileBuffer extends BaseBuffer { }); } - private generateChecksum(profile: IClickhouseProfile): string { + private sortObjectKeys(obj: any): any { + // Cache typeof check result + const type = typeof obj; + + // Fast-path for primitives + if (obj === null || type !== 'object') { + return obj; + } + + // Fast-path for arrays - process values only + if (Array.isArray(obj)) { + // Only map if contains objects + return obj.some((item) => item && typeof item === 'object') + ? obj.map((item) => this.sortObjectKeys(item)) + : obj; + } + + // Get and sort keys once + const sortedKeys = Object.keys(obj).sort(); + const len = sortedKeys.length; + + // Pre-allocate result object + const result: any = {}; + + // Single loop with cached length + for (let i = 0; i < len; i++) { + const key = sortedKeys[i]!; + const value = obj[key]; + result[key] = + value && typeof value === 'object' ? this.sortObjectKeys(value) : value; + } + + return result; + } + + private stringify(profile: IClickhouseProfile): string { const { created_at, ...rest } = profile; - return createHash('sha256').update(JSON.stringify(rest)).digest('hex'); + const sorted = this.sortObjectKeys(rest); + return JSON.stringify(sorted); + } + + private generateChecksum(profile: IClickhouseProfile): string { + const json = this.stringify(profile); + return createHash('sha256').update(json).digest('hex'); } async add(profile: IClickhouseProfile) { @@ -77,7 +120,7 @@ export class ProfileBuffer extends BaseBuffer { } // Update existing profile if its not processed yet - if (existingProfile && existingProfile.processedAt === null) { + if (existingProfile) { await db.profileBuffer.update({ where: { id: existingProfile.id, @@ -86,7 +129,7 @@ export class ProfileBuffer extends BaseBuffer { checksum: this.generateChecksum(mergedProfile), payload: mergedProfile, updatedAt: new Date(), - processedAt: null, // unsure this will get processed (race condition) + processedAt: null, }, }); } else { @@ -165,7 +208,7 @@ export class ProfileBuffer extends BaseBuffer { async tryCleanup() { try { await runEvery({ - interval: 1000 * 60 * 60 * 24, + interval: 60 * 60, // 1 hour fn: this.cleanup.bind(this), key: `${this.name}-cleanup`, }); @@ -175,13 +218,13 @@ export class ProfileBuffer extends BaseBuffer { } async cleanup() { - const thirtyDaysAgo = new Date(); - thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep); + const olderThan = new Date(); + olderThan.setDate(olderThan.getDate() - this.daysToKeep); const deleted = await db.profileBuffer.deleteMany({ where: { processedAt: { - lt: thirtyDaysAgo, + lt: olderThan, }, }, });