improve(buffer): remove check in buffer
This commit is contained in:
@@ -393,15 +393,13 @@ return "OK"
|
||||
new Date(b.created_at || 0).getTime(),
|
||||
);
|
||||
|
||||
const deduplicatedEvents = this.deduplicateEvents(eventsToClickhouse);
|
||||
|
||||
// (D) Insert events into ClickHouse in chunks
|
||||
this.logger.info('Inserting events into ClickHouse', {
|
||||
totalEvents: deduplicatedEvents.length,
|
||||
chunks: Math.ceil(deduplicatedEvents.length / this.chunkSize),
|
||||
totalEvents: eventsToClickhouse.length,
|
||||
chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize),
|
||||
});
|
||||
|
||||
for (const chunk of this.chunks(deduplicatedEvents, this.chunkSize)) {
|
||||
for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) {
|
||||
await ch.insert({
|
||||
table: 'events',
|
||||
values: chunk,
|
||||
@@ -414,7 +412,7 @@ return "OK"
|
||||
|
||||
// (E) Publish "saved" events.
|
||||
const pubMulti = getRedisPub().multi();
|
||||
for (const event of deduplicatedEvents) {
|
||||
for (const event of eventsToClickhouse) {
|
||||
await publishEvent('events', 'saved', transformEvent(event), pubMulti);
|
||||
}
|
||||
await pubMulti.exec();
|
||||
@@ -450,78 +448,6 @@ return "OK"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduplicate events based on specified criteria
|
||||
*
|
||||
* - project_id
|
||||
* - name
|
||||
* - created_at
|
||||
* - profile_id or session_id or device_id
|
||||
*/
|
||||
private deduplicateEvents(events: IClickhouseEvent[]): IClickhouseEvent[] {
|
||||
const { deduplicated, duplicates } = events.reduce<{
|
||||
seen: Map<string, IClickhouseEvent>;
|
||||
deduplicated: IClickhouseEvent[];
|
||||
duplicates: Array<{
|
||||
original: IClickhouseEvent;
|
||||
duplicate: IClickhouseEvent;
|
||||
}>;
|
||||
}>(
|
||||
(acc, event) => {
|
||||
// Create a unique key for the event based on deduplication criteria
|
||||
const key = [
|
||||
event.project_id,
|
||||
event.name,
|
||||
event.created_at,
|
||||
event.profile_id || '',
|
||||
event.session_id || '',
|
||||
event.device_id || '',
|
||||
].join('|');
|
||||
|
||||
const existing = acc.seen.get(key);
|
||||
if (existing) {
|
||||
// It's a duplicate
|
||||
acc.duplicates.push({ original: existing, duplicate: event });
|
||||
} else {
|
||||
// First occurrence
|
||||
acc.seen.set(key, event);
|
||||
acc.deduplicated.push(event);
|
||||
}
|
||||
|
||||
return acc;
|
||||
},
|
||||
{ seen: new Map(), deduplicated: [], duplicates: [] },
|
||||
);
|
||||
|
||||
if (duplicates.length > 0) {
|
||||
this.logger.warn('Found duplicate events', {
|
||||
duplicateCount: duplicates.length,
|
||||
duplicates: duplicates.map(({ original, duplicate }) => ({
|
||||
original: {
|
||||
projectId: original.project_id,
|
||||
name: original.name,
|
||||
created_at: original.created_at,
|
||||
reqId: original.properties?.__reqId,
|
||||
sessionId: original.session_id,
|
||||
profileId: original.profile_id,
|
||||
deviceId: original.device_id,
|
||||
},
|
||||
duplicate: {
|
||||
projectId: duplicate.project_id,
|
||||
name: duplicate.name,
|
||||
created_at: duplicate.created_at,
|
||||
reqId: duplicate.properties?.__reqId,
|
||||
sessionId: duplicate.session_id,
|
||||
profileId: duplicate.profile_id,
|
||||
deviceId: duplicate.device_id,
|
||||
},
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
return deduplicated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a session's events.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user