feat: backfill profile id on events

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-18 17:42:17 +01:00
parent 7e2d93db45
commit ee27568824
7 changed files with 160 additions and 5 deletions

View File

@@ -1,5 +1,6 @@
import { BotBuffer as BotBufferRedis } from './bot-buffer';
import { EventBuffer as EventBufferRedis } from './event-buffer';
import { ProfileBackfillBuffer } from './profile-backfill-buffer';
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer';
import { SessionBuffer } from './session-buffer';
@@ -7,3 +8,6 @@ export const eventBuffer = new EventBufferRedis();
export const profileBuffer = new ProfileBufferRedis();
export const botBuffer = new BotBufferRedis();
export const sessionBuffer = new SessionBuffer();
export const profileBackfillBuffer = new ProfileBackfillBuffer();
export type { ProfileBackfillEntry } from './profile-backfill-buffer';

View File

@@ -0,0 +1,117 @@
import { getSafeJson } from '@openpanel/json';
import { type Redis, getRedisCache } from '@openpanel/redis';
import sqlstring from 'sqlstring';
import { TABLE_NAMES, ch, getReplicatedTableName } from '../clickhouse/client';
import { BaseBuffer } from './base-buffer';
export interface ProfileBackfillEntry {
projectId: string;
sessionId: string;
profileId: string;
}
// Max session IDs per IN clause before we split into another query
const CHUNK_SIZE = 500;
export class ProfileBackfillBuffer extends BaseBuffer {
private batchSize = process.env.PROFILE_BACKFILL_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.PROFILE_BACKFILL_BUFFER_BATCH_SIZE, 10)
: 1000;
private readonly redisKey = 'profile-backfill-buffer';
private redis: Redis;
constructor() {
super({
name: 'profile-backfill',
onFlush: async () => {
await this.processBuffer();
},
});
this.redis = getRedisCache();
}
async add(entry: ProfileBackfillEntry) {
try {
this.logger.info('Adding profile backfill entry', entry);
await this.redis
.multi()
.rpush(this.redisKey, JSON.stringify(entry))
.incr(this.bufferCounterKey)
.exec();
} catch (error) {
this.logger.error('Failed to add profile backfill entry', { error });
}
}
async processBuffer() {
try {
const raw = await this.redis.lrange(this.redisKey, 0, this.batchSize - 1);
if (raw.length === 0) return;
// Deduplicate by sessionId — last write wins (most recent profileId)
const seen = new Map<string, ProfileBackfillEntry>();
for (const r of raw) {
const parsed = getSafeJson<ProfileBackfillEntry>(r);
if (parsed) {
seen.set(parsed.sessionId, parsed);
}
}
const entries = Array.from(seen.values());
const table = getReplicatedTableName(TABLE_NAMES.events);
const chunks = this.chunks(entries, CHUNK_SIZE);
let processedChunks = 0;
for (const chunk of chunks) {
const caseClause = chunk
.map(({ sessionId, profileId }) => `WHEN ${sqlstring.escape(sessionId)} THEN ${sqlstring.escape(profileId)}`)
.join('\n');
const tupleList = chunk
.map(({ projectId, sessionId }) => `(${sqlstring.escape(projectId)}, ${sqlstring.escape(sessionId)})`)
.join(',');
const query = `
UPDATE ${table}
SET profile_id = CASE session_id
${caseClause}
END
WHERE (project_id, session_id) IN (${tupleList})
AND created_at > now() - INTERVAL 6 HOURS`;
await ch.command({
query,
clickhouse_settings: {
mutations_sync: '0',
allow_experimental_lightweight_update: '1'
},
});
processedChunks++;
this.logger.info('Profile backfill chunk applied', {
count: chunk.length,
});
}
if (processedChunks === chunks.length) {
await this.redis
.multi()
.ltrim(this.redisKey, raw.length, -1)
.decrby(this.bufferCounterKey, raw.length)
.exec();
this.logger.info('Profile backfill buffer processed', {
total: entries.length,
});
}
} catch (error) {
this.logger.error('Failed to process profile backfill buffer', { error });
}
}
async getBufferSize() {
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
}
}

View File

@@ -67,6 +67,10 @@ export const TABLE_NAMES = {
* Non-clustered mode = self-hosted environments
*/
export function isClickhouseClustered(): boolean {
if (process.env.CLICKHOUSE_CLUSTER === 'true' || process.env.CLICKHOUSE_CLUSTER === '1') {
return true
}
return !(
process.env.SELF_HOSTED === 'true' || process.env.SELF_HOSTED === '1'
);

View File

@@ -119,11 +119,16 @@ export type CronQueuePayloadOnboarding = {
type: 'onboarding';
payload: undefined;
};
export type CronQueuePayloadFlushProfileBackfill = {
type: 'flushProfileBackfill';
payload: undefined;
};
export type CronQueuePayload =
| CronQueuePayloadSalt
| CronQueuePayloadFlushEvents
| CronQueuePayloadFlushSessions
| CronQueuePayloadFlushProfiles
| CronQueuePayloadFlushProfileBackfill
| CronQueuePayloadPing
| CronQueuePayloadProject
| CronQueuePayloadInsightsDaily