From efd24ca67af1c0caf4056e8b89519133f9dd3bc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 12 Sep 2024 22:09:35 +0200 Subject: [PATCH] perf(api): improve inserting events --- packages/common/src/object.ts | 2 +- packages/db/src/buffers/profile-buffer.ts | 146 +++++++++++++++++--- packages/db/src/services/event.service.ts | 2 +- packages/db/src/services/profile.service.ts | 2 +- 4 files changed, 133 insertions(+), 19 deletions(-) diff --git a/packages/common/src/object.ts b/packages/common/src/object.ts index e012ed1b..6ba6d816 100644 --- a/packages/common/src/object.ts +++ b/packages/common/src/object.ts @@ -4,7 +4,7 @@ import superjson from 'superjson'; export function toDots( obj: Record, path = '' -): Record { +): Record { return Object.entries(obj).reduce((acc, [key, value]) => { if (typeof value === 'object' && value !== null) { return { diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 8c05f065..c236946a 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -31,37 +31,130 @@ export class ProfileBuffer extends RedisBuffer { public onCompleted?: OnCompleted | undefined; public processQueue: ProcessQueue = async (queue) => { + const cleanedQueue = this.combineQueueItems(queue); + const redisProfiles = await this.getCachedProfiles(cleanedQueue); + const dbProfiles = await this.fetchDbProfiles( + cleanedQueue.filter((_, index) => !redisProfiles[index]) + ); + + const values = this.createProfileValues( + cleanedQueue, + redisProfiles, + dbProfiles + ); + + if (values.length > 0) { + await this.updateRedisCache(values); + await this.insertIntoClickhouse(values); + } + + return queue.map((item) => item.index); + }; + + private matchPartialObject( + full: any, + partial: any, + options: { ignore: string[] } + ): boolean { + if (typeof partial !== 'object' || partial === null) { + return partial === full; + } + + for (const key in partial) { + if (options.ignore.includes(key)) { + continue; + } + + if ( + !(key in full) || + !this.matchPartialObject(full[key], partial[key], options) + ) { + return false; + } + } + + return true; + } + + private combineQueueItems( + queue: QueueItem[] + ): QueueItem[] { const itemsToClickhouse = new Map>(); - // Combine all writes to the same profile queue.forEach((item) => { const key = item.event.project_id + item.event.id; const existing = itemsToClickhouse.get(key); - itemsToClickhouse.set( - item.event.project_id + item.event.id, - mergeDeepRight(existing ?? {}, item) - ); + itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); }); - const cleanedQueue = Array.from(itemsToClickhouse.values()); + return Array.from(itemsToClickhouse.values()); + } - const profiles = await chQuery( + private async getCachedProfiles( + cleanedQueue: QueueItem[] + ): Promise<(IClickhouseProfile | null)[]> { + const redisCache = getRedisCache(); + const keys = cleanedQueue.map( + (item) => `profile:${item.event.project_id}:${item.event.id}` + ); + const cachedProfiles = await redisCache.mget(...keys); + return cachedProfiles.map((profile) => { + try { + return profile ? JSON.parse(profile) : null; + } catch (error) { + return null; + } + }); + } + + private async fetchDbProfiles( + cleanedQueue: QueueItem[] + ): Promise { + if (cleanedQueue.length === 0) { + return []; + } + + return await chQuery( `SELECT * - FROM profiles + FROM ${TABLE_NAMES.profiles} WHERE (id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')}) ORDER BY created_at DESC` ); + } - await ch.insert({ - table: TABLE_NAMES.profiles, - values: cleanedQueue.map((item) => { - const profile = profiles.find( + private createProfileValues( + cleanedQueue: QueueItem[], + redisProfiles: (IClickhouseProfile | null)[], + dbProfiles: IClickhouseProfile[] + ): IClickhouseProfile[] { + return cleanedQueue + .map((item, index) => { + const cachedProfile = redisProfiles[index]; + const dbProfile = dbProfiles.find( (p) => p.id === item.event.id && p.project_id === item.event.project_id ); + const profile = cachedProfile || dbProfile; + + if ( + profile && + this.matchPartialObject( + profile, + { + ...item.event, + properties: toDots(item.event.properties), + }, + { + ignore: ['created_at'], + } + ) + ) { + console.log('Ignoring profile', item.event.id); + return null; + } return { id: item.event.id, @@ -74,14 +167,35 @@ export class ProfileBuffer extends RedisBuffer { ...(item.event.properties ?? {}), }), project_id: item.event.project_id ?? profile?.project_id ?? '', - created_at: new Date(), + created_at: item.event.created_at ?? profile?.created_at ?? '', is_external: item.event.is_external, }; - }), + }) + .flatMap((item) => (item ? [item] : [])); + } + + private async updateRedisCache(values: IClickhouseProfile[]): Promise { + const redisCache = getRedisCache(); + const multi = redisCache.multi(); + values.forEach((value) => { + multi.setex( + `profile:${value.project_id}:${value.id}`, + 60 * 30, // 30 minutes + JSON.stringify(value) + ); + }); + await multi.exec(); + } + + private async insertIntoClickhouse( + values: IClickhouseProfile[] + ): Promise { + await ch.insert({ + table: TABLE_NAMES.profiles, + values, format: 'JSONEachRow', }); - return queue.map((item) => item.index); - }; + } public findMany: FindMany = async ( callback diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 428a05ca..cde761aa 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -622,7 +622,7 @@ export async function getTopPages({ }) { const res = await chQuery(` SELECT path, count(*) as count, project_id, first_value(created_at) as first_seen, max(properties['__title']) as title, origin - FROM events_v2 + FROM ${TABLE_NAMES.events} WHERE name = 'screen_view' AND project_id = ${escape(projectId)} AND created_at > now() - INTERVAL 30 DAY diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index 7bd5d905..e96d2d09 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -49,7 +49,7 @@ export async function getProfileById(id: string, projectId: string) { } const [profile] = await chQuery( - `SELECT * FROM profiles WHERE id = ${escape(String(id))} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` + `SELECT * FROM ${TABLE_NAMES.profiles} WHERE id = ${escape(String(id))} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` ); if (!profile) {