fix(buffer): ensure we don't have duplicate events
This commit is contained in:
@@ -41,11 +41,12 @@ export async function postEvent(
|
|||||||
|
|
||||||
const isScreenView = request.body.name === 'screen_view';
|
const isScreenView = request.body.name === 'screen_view';
|
||||||
// this will ensure that we don't have multiple events creating sessions
|
// this will ensure that we don't have multiple events creating sessions
|
||||||
|
const LOCK_DURATION = 1000;
|
||||||
const locked = await getRedisCache().set(
|
const locked = await getRedisCache().set(
|
||||||
`request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`,
|
`request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`,
|
||||||
'locked',
|
'locked',
|
||||||
'PX',
|
'PX',
|
||||||
950, // a bit under the delay below
|
LOCK_DURATION,
|
||||||
'NX',
|
'NX',
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -76,7 +77,7 @@ export async function postEvent(
|
|||||||
// Prioritize 'screen_view' events by setting no delay
|
// Prioritize 'screen_view' events by setting no delay
|
||||||
// This ensures that session starts are created from 'screen_view' events
|
// This ensures that session starts are created from 'screen_view' events
|
||||||
// rather than other events, maintaining accurate session tracking
|
// rather than other events, maintaining accurate session tracking
|
||||||
delay: request.body.name === 'screen_view' ? undefined : 1000,
|
delay: isScreenView ? undefined : LOCK_DURATION - 100,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -229,11 +229,12 @@ async function track({
|
|||||||
}) {
|
}) {
|
||||||
const isScreenView = payload.name === 'screen_view';
|
const isScreenView = payload.name === 'screen_view';
|
||||||
// this will ensure that we don't have multiple events creating sessions
|
// this will ensure that we don't have multiple events creating sessions
|
||||||
|
const LOCK_DURATION = 1000;
|
||||||
const locked = await getRedisCache().set(
|
const locked = await getRedisCache().set(
|
||||||
`request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`,
|
`request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`,
|
||||||
'locked',
|
'locked',
|
||||||
'PX',
|
'PX',
|
||||||
950, // a bit under the delay below
|
LOCK_DURATION,
|
||||||
'NX',
|
'NX',
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -264,7 +265,7 @@ async function track({
|
|||||||
// Prioritize 'screen_view' events by setting no delay
|
// Prioritize 'screen_view' events by setting no delay
|
||||||
// This ensures that session starts are created from 'screen_view' events
|
// This ensures that session starts are created from 'screen_view' events
|
||||||
// rather than other events, maintaining accurate session tracking
|
// rather than other events, maintaining accurate session tracking
|
||||||
delay: isScreenView ? undefined : 1000,
|
delay: isScreenView ? undefined : LOCK_DURATION - 100,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,10 +96,10 @@ const startServer = async () => {
|
|||||||
global: false,
|
global: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
fastify.addHook('preHandler', ipHook);
|
|
||||||
fastify.addHook('preHandler', timestampHook);
|
|
||||||
fastify.addHook('preHandler', fixHook);
|
|
||||||
fastify.addHook('onRequest', requestIdHook);
|
fastify.addHook('onRequest', requestIdHook);
|
||||||
|
fastify.addHook('onRequest', timestampHook);
|
||||||
|
fastify.addHook('onRequest', ipHook);
|
||||||
|
fastify.addHook('onRequest', fixHook);
|
||||||
fastify.addHook('onResponse', requestLoggingHook);
|
fastify.addHook('onResponse', requestLoggingHook);
|
||||||
|
|
||||||
fastify.register(compress, {
|
fastify.register(compress, {
|
||||||
|
|||||||
@@ -393,13 +393,15 @@ return "OK"
|
|||||||
new Date(b.created_at || 0).getTime(),
|
new Date(b.created_at || 0).getTime(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const deduplicatedEvents = this.deduplicateEvents(eventsToClickhouse);
|
||||||
|
|
||||||
// (D) Insert events into ClickHouse in chunks
|
// (D) Insert events into ClickHouse in chunks
|
||||||
this.logger.info('Inserting events into ClickHouse', {
|
this.logger.info('Inserting events into ClickHouse', {
|
||||||
totalEvents: eventsToClickhouse.length,
|
totalEvents: deduplicatedEvents.length,
|
||||||
chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize),
|
chunks: Math.ceil(deduplicatedEvents.length / this.chunkSize),
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) {
|
for (const chunk of this.chunks(deduplicatedEvents, this.chunkSize)) {
|
||||||
await ch.insert({
|
await ch.insert({
|
||||||
table: 'events',
|
table: 'events',
|
||||||
values: chunk,
|
values: chunk,
|
||||||
@@ -412,7 +414,7 @@ return "OK"
|
|||||||
|
|
||||||
// (E) Publish "saved" events.
|
// (E) Publish "saved" events.
|
||||||
const pubMulti = getRedisPub().multi();
|
const pubMulti = getRedisPub().multi();
|
||||||
for (const event of eventsToClickhouse) {
|
for (const event of deduplicatedEvents) {
|
||||||
await publishEvent('events', 'saved', transformEvent(event), pubMulti);
|
await publishEvent('events', 'saved', transformEvent(event), pubMulti);
|
||||||
}
|
}
|
||||||
await pubMulti.exec();
|
await pubMulti.exec();
|
||||||
@@ -448,6 +450,78 @@ return "OK"
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deduplicate events based on specified criteria
|
||||||
|
*
|
||||||
|
* - project_id
|
||||||
|
* - name
|
||||||
|
* - created_at
|
||||||
|
* - profile_id or session_id or device_id
|
||||||
|
*/
|
||||||
|
private deduplicateEvents(events: IClickhouseEvent[]): IClickhouseEvent[] {
|
||||||
|
const { deduplicated, duplicates } = events.reduce<{
|
||||||
|
seen: Map<string, IClickhouseEvent>;
|
||||||
|
deduplicated: IClickhouseEvent[];
|
||||||
|
duplicates: Array<{
|
||||||
|
original: IClickhouseEvent;
|
||||||
|
duplicate: IClickhouseEvent;
|
||||||
|
}>;
|
||||||
|
}>(
|
||||||
|
(acc, event) => {
|
||||||
|
// Create a unique key for the event based on deduplication criteria
|
||||||
|
const key = [
|
||||||
|
event.project_id,
|
||||||
|
event.name,
|
||||||
|
event.created_at,
|
||||||
|
event.profile_id || '',
|
||||||
|
event.session_id || '',
|
||||||
|
event.device_id || '',
|
||||||
|
].join('|');
|
||||||
|
|
||||||
|
const existing = acc.seen.get(key);
|
||||||
|
if (existing) {
|
||||||
|
// It's a duplicate
|
||||||
|
acc.duplicates.push({ original: existing, duplicate: event });
|
||||||
|
} else {
|
||||||
|
// First occurrence
|
||||||
|
acc.seen.set(key, event);
|
||||||
|
acc.deduplicated.push(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
return acc;
|
||||||
|
},
|
||||||
|
{ seen: new Map(), deduplicated: [], duplicates: [] },
|
||||||
|
);
|
||||||
|
|
||||||
|
if (duplicates.length > 0) {
|
||||||
|
this.logger.warn('Found duplicate events', {
|
||||||
|
duplicateCount: duplicates.length,
|
||||||
|
duplicates: duplicates.map(({ original, duplicate }) => ({
|
||||||
|
original: {
|
||||||
|
projectId: original.project_id,
|
||||||
|
name: original.name,
|
||||||
|
created_at: original.created_at,
|
||||||
|
reqId: original.properties?.__reqId,
|
||||||
|
sessionId: original.session_id,
|
||||||
|
profileId: original.profile_id,
|
||||||
|
deviceId: original.device_id,
|
||||||
|
},
|
||||||
|
duplicate: {
|
||||||
|
projectId: duplicate.project_id,
|
||||||
|
name: duplicate.name,
|
||||||
|
created_at: duplicate.created_at,
|
||||||
|
reqId: duplicate.properties?.__reqId,
|
||||||
|
sessionId: duplicate.session_id,
|
||||||
|
profileId: duplicate.profile_id,
|
||||||
|
deviceId: duplicate.device_id,
|
||||||
|
},
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return deduplicated;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a session's events.
|
* Process a session's events.
|
||||||
*
|
*
|
||||||
|
|||||||
Reference in New Issue
Block a user