diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index b29a6a7c..4a564e0d 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -58,6 +58,11 @@ export async function bootCron() { type: 'flushSessions', pattern: 1000 * 10, }, + { + name: 'flush', + type: 'flushProfileBackfill', + pattern: 1000 * 30, + }, { name: 'insightsDaily', type: 'insightsDaily', diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index 135e174d..2735f565 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -1,6 +1,6 @@ import type { Job } from 'bullmq'; -import { eventBuffer, profileBuffer, sessionBuffer } from '@openpanel/db'; +import { eventBuffer, profileBackfillBuffer, profileBuffer, sessionBuffer } from '@openpanel/db'; import type { CronQueuePayload } from '@openpanel/queue'; import { jobdeleteProjects } from './cron.delete-projects'; @@ -23,6 +23,9 @@ export async function cronJob(job: Job) { case 'flushSessions': { return await sessionBuffer.tryFlush(); } + case 'flushProfileBackfill': { + return await profileBackfillBuffer.tryFlush(); + } case 'ping': { return await ping(); } diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index d24e554e..ea0eb2f0 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -14,6 +14,7 @@ import { getEvents, getHasFunnelRules, getNotificationRulesByProjectId, + profileBackfillBuffer, sessionBuffer, transformSessionToEvent, } from '@openpanel/db'; @@ -34,9 +35,9 @@ async function getSessionEvents({ endAt: Date; }): Promise { const sql = ` - SELECT * FROM ${TABLE_NAMES.events} - WHERE - session_id = '${sessionId}' + SELECT * FROM ${TABLE_NAMES.events} + WHERE + session_id = '${sessionId}' AND project_id = '${projectId}' AND created_at BETWEEN '${formatClickhouseDate(startAt)}' AND '${formatClickhouseDate(endAt)}' ORDER BY created_at DESC LIMIT ${MAX_SESSION_EVENTS}; @@ -91,6 +92,22 @@ export async function createSessionEnd( }); } + const profileId = session.profile_id || payload.profileId + + if ( + profileId !== session.device_id + && process.env.EXPERIMENTAL_PROFILE_BACKFILL === '1' + ) { + const runOnProjects = process.env.EXPERIMENTAL_PROFILE_BACKFILL_PROJECTS?.split(',').filter(Boolean) ?? [] + if(runOnProjects.length === 0 || runOnProjects.includes(payload.projectId)) { + await profileBackfillBuffer.add({ + projectId: payload.projectId, + sessionId: payload.sessionId, + profileId: profileId, + }); + } + } + // Create session end event return createEvent({ ...payload, @@ -104,7 +121,7 @@ export async function createSessionEnd( createdAt: new Date( convertClickhouseDateToJs(session.ended_at).getTime() + 1000, ), - profileId: session.profile_id || payload.profileId, + profileId: profileId, }); } diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 17932ab9..5bafb88f 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,5 +1,6 @@ import { BotBuffer as BotBufferRedis } from './bot-buffer'; import { EventBuffer as EventBufferRedis } from './event-buffer'; +import { ProfileBackfillBuffer } from './profile-backfill-buffer'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; import { SessionBuffer } from './session-buffer'; @@ -7,3 +8,6 @@ export const eventBuffer = new EventBufferRedis(); export const profileBuffer = new ProfileBufferRedis(); export const botBuffer = new BotBufferRedis(); export const sessionBuffer = new SessionBuffer(); +export const profileBackfillBuffer = new ProfileBackfillBuffer(); + +export type { ProfileBackfillEntry } from './profile-backfill-buffer'; diff --git a/packages/db/src/buffers/profile-backfill-buffer.ts b/packages/db/src/buffers/profile-backfill-buffer.ts new file mode 100644 index 00000000..175d38b6 --- /dev/null +++ b/packages/db/src/buffers/profile-backfill-buffer.ts @@ -0,0 +1,117 @@ +import { getSafeJson } from '@openpanel/json'; +import { type Redis, getRedisCache } from '@openpanel/redis'; +import sqlstring from 'sqlstring'; +import { TABLE_NAMES, ch, getReplicatedTableName } from '../clickhouse/client'; +import { BaseBuffer } from './base-buffer'; + +export interface ProfileBackfillEntry { + projectId: string; + sessionId: string; + profileId: string; +} + +// Max session IDs per IN clause before we split into another query +const CHUNK_SIZE = 500; + +export class ProfileBackfillBuffer extends BaseBuffer { + private batchSize = process.env.PROFILE_BACKFILL_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.PROFILE_BACKFILL_BUFFER_BATCH_SIZE, 10) + : 1000; + + private readonly redisKey = 'profile-backfill-buffer'; + private redis: Redis; + + constructor() { + super({ + name: 'profile-backfill', + onFlush: async () => { + await this.processBuffer(); + }, + }); + this.redis = getRedisCache(); + } + + async add(entry: ProfileBackfillEntry) { + try { + this.logger.info('Adding profile backfill entry', entry); + await this.redis + .multi() + .rpush(this.redisKey, JSON.stringify(entry)) + .incr(this.bufferCounterKey) + .exec(); + } catch (error) { + this.logger.error('Failed to add profile backfill entry', { error }); + } + } + + async processBuffer() { + try { + const raw = await this.redis.lrange(this.redisKey, 0, this.batchSize - 1); + + if (raw.length === 0) return; + + // Deduplicate by sessionId — last write wins (most recent profileId) + const seen = new Map(); + for (const r of raw) { + const parsed = getSafeJson(r); + if (parsed) { + seen.set(parsed.sessionId, parsed); + } + } + const entries = Array.from(seen.values()); + + const table = getReplicatedTableName(TABLE_NAMES.events); + + const chunks = this.chunks(entries, CHUNK_SIZE); + let processedChunks = 0; + + for (const chunk of chunks) { + const caseClause = chunk + .map(({ sessionId, profileId }) => `WHEN ${sqlstring.escape(sessionId)} THEN ${sqlstring.escape(profileId)}`) + .join('\n'); + const tupleList = chunk + .map(({ projectId, sessionId }) => `(${sqlstring.escape(projectId)}, ${sqlstring.escape(sessionId)})`) + .join(','); + + const query = ` + UPDATE ${table} + SET profile_id = CASE session_id + ${caseClause} + END + WHERE (project_id, session_id) IN (${tupleList}) + AND created_at > now() - INTERVAL 6 HOURS`; + + await ch.command({ + query, + clickhouse_settings: { + mutations_sync: '0', + allow_experimental_lightweight_update: '1' + }, + }); + + processedChunks++; + this.logger.info('Profile backfill chunk applied', { + count: chunk.length, + }); + } + + if (processedChunks === chunks.length) { + await this.redis + .multi() + .ltrim(this.redisKey, raw.length, -1) + .decrby(this.bufferCounterKey, raw.length) + .exec(); + + this.logger.info('Profile backfill buffer processed', { + total: entries.length, + }); + } + } catch (error) { + this.logger.error('Failed to process profile backfill buffer', { error }); + } + } + + async getBufferSize() { + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); + } +} diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts index 0260a342..244d15a2 100644 --- a/packages/db/src/clickhouse/client.ts +++ b/packages/db/src/clickhouse/client.ts @@ -67,6 +67,10 @@ export const TABLE_NAMES = { * Non-clustered mode = self-hosted environments */ export function isClickhouseClustered(): boolean { + if (process.env.CLICKHOUSE_CLUSTER === 'true' || process.env.CLICKHOUSE_CLUSTER === '1') { + return true + } + return !( process.env.SELF_HOSTED === 'true' || process.env.SELF_HOSTED === '1' ); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 32b21a4c..37ee7024 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -119,11 +119,16 @@ export type CronQueuePayloadOnboarding = { type: 'onboarding'; payload: undefined; }; +export type CronQueuePayloadFlushProfileBackfill = { + type: 'flushProfileBackfill'; + payload: undefined; +}; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents | CronQueuePayloadFlushSessions | CronQueuePayloadFlushProfiles + | CronQueuePayloadFlushProfileBackfill | CronQueuePayloadPing | CronQueuePayloadProject | CronQueuePayloadInsightsDaily