diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index ef0fde37..359269a6 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -41,11 +41,12 @@ export async function postEvent( const isScreenView = request.body.name === 'screen_view'; // this will ensure that we don't have multiple events creating sessions + const LOCK_DURATION = 1000; const locked = await getRedisCache().set( `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, 'locked', 'PX', - 950, // a bit under the delay below + LOCK_DURATION, 'NX', ); @@ -76,7 +77,7 @@ export async function postEvent( // Prioritize 'screen_view' events by setting no delay // This ensures that session starts are created from 'screen_view' events // rather than other events, maintaining accurate session tracking - delay: request.body.name === 'screen_view' ? undefined : 1000, + delay: isScreenView ? undefined : LOCK_DURATION - 100, }, ); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 02f15485..aa481d1f 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -229,11 +229,12 @@ async function track({ }) { const isScreenView = payload.name === 'screen_view'; // this will ensure that we don't have multiple events creating sessions + const LOCK_DURATION = 1000; const locked = await getRedisCache().set( `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, 'locked', 'PX', - 950, // a bit under the delay below + LOCK_DURATION, 'NX', ); @@ -264,7 +265,7 @@ async function track({ // Prioritize 'screen_view' events by setting no delay // This ensures that session starts are created from 'screen_view' events // rather than other events, maintaining accurate session tracking - delay: isScreenView ? undefined : 1000, + delay: isScreenView ? undefined : LOCK_DURATION - 100, }, ); } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 3dc82550..d615f423 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -96,10 +96,10 @@ const startServer = async () => { global: false, }); - fastify.addHook('preHandler', ipHook); - fastify.addHook('preHandler', timestampHook); - fastify.addHook('preHandler', fixHook); fastify.addHook('onRequest', requestIdHook); + fastify.addHook('onRequest', timestampHook); + fastify.addHook('onRequest', ipHook); + fastify.addHook('onRequest', fixHook); fastify.addHook('onResponse', requestLoggingHook); fastify.register(compress, { diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index f568ed76..f209ac7b 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -393,13 +393,15 @@ return "OK" new Date(b.created_at || 0).getTime(), ); + const deduplicatedEvents = this.deduplicateEvents(eventsToClickhouse); + // (D) Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { - totalEvents: eventsToClickhouse.length, - chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), + totalEvents: deduplicatedEvents.length, + chunks: Math.ceil(deduplicatedEvents.length / this.chunkSize), }); - for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) { + for (const chunk of this.chunks(deduplicatedEvents, this.chunkSize)) { await ch.insert({ table: 'events', values: chunk, @@ -412,7 +414,7 @@ return "OK" // (E) Publish "saved" events. const pubMulti = getRedisPub().multi(); - for (const event of eventsToClickhouse) { + for (const event of deduplicatedEvents) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); @@ -448,6 +450,78 @@ return "OK" } } + /** + * Deduplicate events based on specified criteria + * + * - project_id + * - name + * - created_at + * - profile_id or session_id or device_id + */ + private deduplicateEvents(events: IClickhouseEvent[]): IClickhouseEvent[] { + const { deduplicated, duplicates } = events.reduce<{ + seen: Map; + deduplicated: IClickhouseEvent[]; + duplicates: Array<{ + original: IClickhouseEvent; + duplicate: IClickhouseEvent; + }>; + }>( + (acc, event) => { + // Create a unique key for the event based on deduplication criteria + const key = [ + event.project_id, + event.name, + event.created_at, + event.profile_id || '', + event.session_id || '', + event.device_id || '', + ].join('|'); + + const existing = acc.seen.get(key); + if (existing) { + // It's a duplicate + acc.duplicates.push({ original: existing, duplicate: event }); + } else { + // First occurrence + acc.seen.set(key, event); + acc.deduplicated.push(event); + } + + return acc; + }, + { seen: new Map(), deduplicated: [], duplicates: [] }, + ); + + if (duplicates.length > 0) { + this.logger.warn('Found duplicate events', { + duplicateCount: duplicates.length, + duplicates: duplicates.map(({ original, duplicate }) => ({ + original: { + projectId: original.project_id, + name: original.name, + created_at: original.created_at, + reqId: original.properties?.__reqId, + sessionId: original.session_id, + profileId: original.profile_id, + deviceId: original.device_id, + }, + duplicate: { + projectId: duplicate.project_id, + name: duplicate.name, + created_at: duplicate.created_at, + reqId: duplicate.properties?.__reqId, + sessionId: duplicate.session_id, + profileId: duplicate.profile_id, + deviceId: duplicate.device_id, + }, + })), + }); + } + + return deduplicated; + } + /** * Process a session's events. *