diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index 42e427a8..f2da9968 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -44,7 +44,7 @@ async function getCompleteSessionWithSessionStart({ sessionId: string; logger: ILogger; }): Promise> { - const intervals = [6, 12, 24, 72]; + const intervals = [1, 6, 12, 24, 72]; let intervalIndex = 0; for (const hoursInterval of intervals) { const events = await getCompleteSession({ @@ -76,6 +76,7 @@ export async function createSessionEnd( const payload = job.data.payload; + // TODO: Get complete session from buffer to offload clickhouse const [lastScreenView, eventsInDb] = await Promise.all([ eventBuffer.getLastScreenView({ projectId: payload.projectId, @@ -96,19 +97,6 @@ export async function createSessionEnd( new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); - events.map((event, index) => { - job.log( - [ - `Index: ${index}`, - `Event: ${event.name}`, - `Created: ${event.createdAt.toISOString()}`, - `DeviceId: ${event.deviceId}`, - `Profile: ${event.profileId}`, - `Path: ${event.path}`, - ].join('\n'), - ); - }); - const sessionDuration = events.reduce((acc, event) => { return acc + event.duration; }, 0); diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index a5810f79..76ef3989 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -70,7 +70,7 @@ export async function incomingEvent(job: Job) { projectId, properties: omit(GLOBAL_PROPERTIES, { ...properties, - user_agent: userAgent, + __user_agent: userAgent, __hash: hash, __query: query, }), diff --git a/packages/db/src/buffers/base-buffer.ts b/packages/db/src/buffers/base-buffer.ts new file mode 100644 index 00000000..01974f5b --- /dev/null +++ b/packages/db/src/buffers/base-buffer.ts @@ -0,0 +1,74 @@ +import { generateSecureId } from '@openpanel/common/server/id'; +import { type ILogger, createLogger } from '@openpanel/logger'; +import { getRedisCache } from '@openpanel/redis'; + +export class BaseBuffer { + name: string; + logger: ILogger; + lockKey: string; + lockTimeout = 60; + onFlush: () => void; + + constructor(options: { + name: string; + onFlush: () => Promise; + }) { + this.logger = createLogger({ name: options.name }); + this.name = options.name; + this.lockKey = `lock:${this.name}`; + this.onFlush = options.onFlush; + } + + protected chunks(items: T[], size: number) { + const chunks = []; + for (let i = 0; i < items.length; i += size) { + chunks.push(items.slice(i, i + size)); + } + return chunks; + } + + private async releaseLock(lockId: string): Promise { + this.logger.debug('Releasing lock...'); + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + `; + await getRedisCache().eval(script, 1, this.lockKey, lockId); + } + + async tryFlush() { + const now = performance.now(); + const lockId = generateSecureId('lock'); + const acquired = await getRedisCache().set( + this.lockKey, + lockId, + 'EX', + this.lockTimeout, + 'NX', + ); + if (acquired === 'OK') { + try { + this.logger.info('Acquired lock. Processing buffer...', { + lockId, + }); + await this.onFlush(); + } catch (error) { + this.logger.error('Failed to process buffer', { + error, + lockId, + }); + } finally { + await this.releaseLock(lockId); + this.logger.info('Flush completed', { + elapsed: performance.now() - now, + lockId, + }); + } + } else { + this.logger.warn('Failed to acquire lock. Skipping flush.', { lockId }); + } + } +} diff --git a/packages/db/src/buffers/bot-buffer-psql.ts b/packages/db/src/buffers/bot-buffer-psql.ts index d0a06d16..d9d67332 100644 --- a/packages/db/src/buffers/bot-buffer-psql.ts +++ b/packages/db/src/buffers/bot-buffer-psql.ts @@ -6,17 +6,20 @@ import { Prisma } from '@prisma/client'; import { TABLE_NAMES, ch } from '../clickhouse-client'; import { db } from '../prisma-client'; import type { IClickhouseBotEvent } from '../services/event.service'; +import { BaseBuffer } from './base-buffer'; -export class BotBuffer { - private name = 'bot'; - private lockKey = `lock:${this.name}`; - private logger: ILogger; - private lockTimeout = 60; +export class BotBuffer extends BaseBuffer { private daysToKeep = 1; private batchSize = 500; constructor() { - this.logger = createLogger({ name: this.name }); + super({ + name: 'bot', + onFlush: async () => { + await this.processBuffer(); + await this.tryCleanup(); + }, + }); } async add(event: IClickhouseBotEvent) { @@ -44,43 +47,6 @@ export class BotBuffer { } } - private async releaseLock(lockId: string): Promise { - this.logger.debug('Releasing lock...'); - const script = ` - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - `; - await getRedisCache().eval(script, 1, this.lockKey, lockId); - } - - async tryFlush() { - const lockId = generateSecureId('lock'); - const acquired = await getRedisCache().set( - this.lockKey, - lockId, - 'EX', - this.lockTimeout, - 'NX', - ); - - if (acquired === 'OK') { - try { - this.logger.info('Acquired lock. Processing buffer...'); - await this.processBuffer(); - await this.tryCleanup(); - } catch (error) { - this.logger.error('Failed to process buffer', { error }); - } finally { - await this.releaseLock(lockId); - } - } else { - this.logger.warn('Failed to acquire lock. Skipping flush.'); - } - } - async processBuffer() { const eventsToProcess = await db.botEventBuffer.findMany({ where: { diff --git a/packages/db/src/buffers/event-buffer-psql.ts b/packages/db/src/buffers/event-buffer-psql.ts index 7bde8661..0a94a409 100644 --- a/packages/db/src/buffers/event-buffer-psql.ts +++ b/packages/db/src/buffers/event-buffer-psql.ts @@ -10,17 +10,25 @@ import { type IServiceEvent, transformEvent, } from '../services/event.service'; +import { BaseBuffer } from './base-buffer'; -export class EventBuffer { - private name = 'event'; - private logger: Logger; - private lockKey = `lock:${this.name}`; - private lockTimeout = 60; - private daysToKeep = 2; - private batchSize = 1000; +export class EventBuffer extends BaseBuffer { + private daysToKeep = 3; + private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + : 2000; + private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + : 1000; constructor() { - this.logger = createLogger({ name: this.name }); + super({ + name: 'event', + onFlush: async () => { + await this.processBuffer(); + await this.cleanup(); + }, + }); } async add(event: IClickhouseEvent) { @@ -30,22 +38,23 @@ export class EventBuffer { projectId: event.project_id, eventId: event.id, name: event.name, - profileId: event.profile_id, - sessionId: event.session_id, + profileId: event.profile_id || null, + sessionId: event.session_id || null, payload: event, }, }); - // TODO: UNCOMMENT THIS!!! - // this.publishEvent('event:received', event); - // if (event.profile_id) { - // getRedisCache().set( - // `live:event:${event.project_id}:${event.profile_id}`, - // '', - // 'EX', - // 60 * 5, - // ); - // } + if (!process.env.TEST_NEW_BUFFER) { + this.publishEvent('event:received', event); + if (event.profile_id) { + getRedisCache().set( + `live:event:${event.project_id}:${event.profile_id}`, + '', + 'EX', + 60 * 5, + ); + } + } } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { if (error.code === 'P2002') { @@ -70,49 +79,19 @@ export class EventBuffer { } } - private async releaseLock(lockId: string): Promise { - this.logger.debug('Releasing lock...'); - const script = ` - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - `; - await getRedisCache().eval(script, 1, this.lockKey, lockId); - } - - async tryFlush() { - const lockId = generateSecureId('lock'); - const acquired = await getRedisCache().set( - this.lockKey, - lockId, - 'EX', - this.lockTimeout, - 'NX', - ); - if (acquired === 'OK') { - try { - this.logger.info('Acquired lock. Processing buffer...'); - await this.processBuffer(); - await this.tryCleanup(); - } catch (error) { - this.logger.error('Failed to process buffer', { error }); - } finally { - await this.releaseLock(lockId); - } - } else { - this.logger.warn('Failed to acquire lock. Skipping flush.'); - } - } - async processBuffer() { + let now = performance.now(); + const timer: Record = { + fetchUnprocessedEvents: undefined, + transformEvents: undefined, + insertToClickhouse: undefined, + markAsProcessed: undefined, + }; const eventsToProcess = await db.$queryRaw` - WITH has_2_special AS ( + WITH has_more_than_2_events AS ( SELECT "sessionId" FROM event_buffer WHERE "processedAt" IS NULL - AND name IN ('screen_view', 'session_start', 'session_end') GROUP BY "sessionId" HAVING COUNT(*) >= 2 ) @@ -120,30 +99,32 @@ export class EventBuffer { FROM event_buffer e WHERE e."processedAt" IS NULL AND ( - -- 1) if the event name is NOT in the special set - e.name NOT IN ('screen_view', 'session_start', 'session_end') + -- 1) all events except screen_view + e.name != 'screen_view' OR - -- 2) if the event name IS in the special set AND - -- the session has >= 2 such unprocessed events - ( - e.name IN ('screen_view', 'session_start', 'session_end') - AND e."sessionId" IN (SELECT "sessionId" FROM has_2_special) - ) + -- 2) if the session has >= 2 such unprocessed events + e."sessionId" IN (SELECT "sessionId" FROM has_more_than_2_events) ) - ORDER BY e."createdAt" ASC -- or e.id, whichever "oldest first" logic you use + ORDER BY e."createdAt" ASC LIMIT ${this.batchSize} `; + timer.fetchUnprocessedEvents = performance.now() - now; + now = performance.now(); + const toInsert = eventsToProcess.reduce( (acc, event, index, list) => { + // SCREEN VIEW if (event.name === 'screen_view') { - const nextScreenView = list.find( - (e, eIndex) => - (e.name === 'screen_view' || e.name === 'session_end') && - e.sessionId === event.sessionId && - eIndex > index, - ); + const nextScreenView = list + .slice(index + 1) + .find( + (e) => + (e.name === 'screen_view' || e.name === 'session_end') && + e.sessionId === event.sessionId, + ); + // Calculate duration if (nextScreenView && nextScreenView.name === 'screen_view') { event.payload.duration = new Date(nextScreenView.createdAt).getTime() - @@ -155,6 +136,20 @@ export class EventBuffer { if (!nextScreenView) { return acc; } + } else { + // OTHER EVENTS + const currentScreenView = list + .slice(0, index) + .findLast( + (e) => + e.name === 'screen_view' && e.sessionId === event.sessionId, + ); + + if (currentScreenView) { + // Get path related info from the current screen view + event.payload.path = currentScreenView.payload.path; + event.payload.origin = currentScreenView.payload.origin; + } } acc.push(event); @@ -164,17 +159,29 @@ export class EventBuffer { [], ); + timer.transformEvents = performance.now() - now; + now = performance.now(); + if (toInsert.length > 0) { - await ch.insert({ - table: 'events', - values: toInsert.map((e) => e.payload), - format: 'JSONEachRow', - }); + const events = toInsert.map((e) => e.payload); + for (const chunk of this.chunks(events, this.chunkSize)) { + await ch.insert({ + table: 'events', + values: chunk, + format: 'JSONEachRow', + }); + } + + timer.insertToClickhouse = performance.now() - now; + now = performance.now(); for (const event of toInsert) { this.publishEvent('event:saved', event.payload); } + timer.markAsProcessed = performance.now() - now; + now = performance.now(); + await db.eventBuffer.updateMany({ where: { id: { @@ -186,8 +193,11 @@ export class EventBuffer { }, }); + timer.markAsProcessed = performance.now() - now; + this.logger.info('Processed events', { count: toInsert.length, + timer, }); } } diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 69909f09..ee4924f4 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,7 +1,16 @@ import { BotBuffer } from './bot-buffer'; +import { BotBuffer as NewBotBuffer } from './bot-buffer-psql'; import { EventBuffer } from './event-buffer'; +import { EventBuffer as NewEventBuffer } from './event-buffer-psql'; import { ProfileBuffer } from './profile-buffer'; +import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql'; -export const eventBuffer = new EventBuffer(); -export const profileBuffer = new ProfileBuffer(); -export const botBuffer = new BotBuffer(); +export const eventBuffer = process.env.USE_NEW_BUFFER + ? new NewEventBuffer() + : new EventBuffer(); +export const profileBuffer = process.env.USE_NEW_BUFFER + ? new NewProfileBuffer() + : new ProfileBuffer(); +export const botBuffer = process.env.USE_NEW_BUFFER + ? new NewBotBuffer() + : new BotBuffer(); diff --git a/packages/db/src/buffers/profile-buffer-psql.ts b/packages/db/src/buffers/profile-buffer-psql.ts index 3046fdba..0747d9ea 100644 --- a/packages/db/src/buffers/profile-buffer-psql.ts +++ b/packages/db/src/buffers/profile-buffer-psql.ts @@ -7,16 +7,25 @@ import { mergeDeepRight } from 'ramda'; import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client'; import { db } from '../prisma-client'; import type { IClickhouseProfile } from '../services/profile.service'; +import { BaseBuffer } from './base-buffer'; -export class ProfileBuffer { - private name = 'profile'; - private logger: Logger; - private lockKey = `lock:${this.name}`; - private lockTimeout = 60; +export class ProfileBuffer extends BaseBuffer { private daysToKeep = 30; + private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + : 2000; + private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + : 1000; constructor() { - this.logger = createLogger({ name: this.name }); + super({ + name: 'profile', + onFlush: async () => { + await this.processBuffer(); + await this.tryCleanup(); + }, + }); } private generateChecksum(profile: IClickhouseProfile): string { @@ -27,7 +36,6 @@ export class ProfileBuffer { async add(profile: IClickhouseProfile) { try { const checksum = this.generateChecksum(profile); - // Check if we have this exact profile in buffer const existingProfile = await db.profileBuffer.findFirst({ where: { @@ -75,8 +83,10 @@ export class ProfileBuffer { id: existingProfile.id, }, data: { + checksum: this.generateChecksum(mergedProfile), payload: mergedProfile, updatedAt: new Date(), + processedAt: null, // unsure this will get processed (race condition) }, }); } else { @@ -110,43 +120,6 @@ export class ProfileBuffer { return result[0] || null; } - private async releaseLock(lockId: string): Promise { - this.logger.debug('Releasing lock...'); - const script = ` - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - `; - await getRedisCache().eval(script, 1, this.lockKey, lockId); - } - - async tryFlush() { - const lockId = generateSecureId('lock'); - const acquired = await getRedisCache().set( - this.lockKey, - lockId, - 'EX', - this.lockTimeout, - 'NX', - ); - - if (acquired === 'OK') { - try { - this.logger.info('Acquired lock. Processing buffer...'); - await this.processBuffer(); - await this.tryCleanup(); - } catch (error) { - this.logger.error('Failed to process buffer', { error }); - } finally { - await this.releaseLock(lockId); - } - } else { - this.logger.warn('Failed to acquire lock. Skipping flush.'); - } - } - async processBuffer() { const profilesToProcess = await db.profileBuffer.findMany({ where: { @@ -155,6 +128,7 @@ export class ProfileBuffer { orderBy: { createdAt: 'asc', }, + take: this.batchSize, }); if (profilesToProcess.length > 0) { @@ -163,11 +137,13 @@ export class ProfileBuffer { return profile; }); - await ch.insert({ - table: TABLE_NAMES.profiles, - values: toInsert, - format: 'JSONEachRow', - }); + for (const chunk of this.chunks(profilesToProcess, this.chunkSize)) { + await ch.insert({ + table: TABLE_NAMES.profiles, + values: chunk, + format: 'JSONEachRow', + }); + } await db.profileBuffer.updateMany({ where: { diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 62e0e53b..4a14b0ec 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -273,11 +273,11 @@ export async function getEvents( } export async function createEvent(payload: IServiceCreateEventPayload) { - if (!payload.profileId) { + if (!payload.profileId && payload.deviceId) { payload.profileId = payload.deviceId; } - if (payload.profileId !== '') { + if (payload.profileId) { await upsertProfile({ id: String(payload.profileId), isExternal: payload.profileId !== payload.deviceId, @@ -310,7 +310,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) { profile_id: payload.profileId ? String(payload.profileId) : '', project_id: payload.projectId, session_id: payload.sessionId, - properties: toDots(omit(['_path'], payload.properties)), + properties: toDots(payload.properties), path: payload.path ?? '', origin: payload.origin ?? '', created_at: formatClickhouseDate(payload.createdAt),