From cd519405d1bc3262d23bdf334d757d5cb6bfe3db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 28 Feb 2025 10:06:32 +0100 Subject: [PATCH] improve(buffer): remove check in buffer --- packages/db/src/buffers/event-buffer-redis.ts | 82 +------------------ 1 file changed, 4 insertions(+), 78 deletions(-) diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index f209ac7b..f568ed76 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -393,15 +393,13 @@ return "OK" new Date(b.created_at || 0).getTime(), ); - const deduplicatedEvents = this.deduplicateEvents(eventsToClickhouse); - // (D) Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { - totalEvents: deduplicatedEvents.length, - chunks: Math.ceil(deduplicatedEvents.length / this.chunkSize), + totalEvents: eventsToClickhouse.length, + chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), }); - for (const chunk of this.chunks(deduplicatedEvents, this.chunkSize)) { + for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) { await ch.insert({ table: 'events', values: chunk, @@ -414,7 +412,7 @@ return "OK" // (E) Publish "saved" events. const pubMulti = getRedisPub().multi(); - for (const event of deduplicatedEvents) { + for (const event of eventsToClickhouse) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); @@ -450,78 +448,6 @@ return "OK" } } - /** - * Deduplicate events based on specified criteria - * - * - project_id - * - name - * - created_at - * - profile_id or session_id or device_id - */ - private deduplicateEvents(events: IClickhouseEvent[]): IClickhouseEvent[] { - const { deduplicated, duplicates } = events.reduce<{ - seen: Map; - deduplicated: IClickhouseEvent[]; - duplicates: Array<{ - original: IClickhouseEvent; - duplicate: IClickhouseEvent; - }>; - }>( - (acc, event) => { - // Create a unique key for the event based on deduplication criteria - const key = [ - event.project_id, - event.name, - event.created_at, - event.profile_id || '', - event.session_id || '', - event.device_id || '', - ].join('|'); - - const existing = acc.seen.get(key); - if (existing) { - // It's a duplicate - acc.duplicates.push({ original: existing, duplicate: event }); - } else { - // First occurrence - acc.seen.set(key, event); - acc.deduplicated.push(event); - } - - return acc; - }, - { seen: new Map(), deduplicated: [], duplicates: [] }, - ); - - if (duplicates.length > 0) { - this.logger.warn('Found duplicate events', { - duplicateCount: duplicates.length, - duplicates: duplicates.map(({ original, duplicate }) => ({ - original: { - projectId: original.project_id, - name: original.name, - created_at: original.created_at, - reqId: original.properties?.__reqId, - sessionId: original.session_id, - profileId: original.profile_id, - deviceId: original.device_id, - }, - duplicate: { - projectId: duplicate.project_id, - name: duplicate.name, - created_at: duplicate.created_at, - reqId: duplicate.properties?.__reqId, - sessionId: duplicate.session_id, - profileId: duplicate.profile_id, - deviceId: duplicate.device_id, - }, - })), - }); - } - - return deduplicated; - } - /** * Process a session's events. *