diff --git a/packages/db/src/buffers/bot-buffer.ts b/packages/db/src/buffers/bot-buffer.ts index 11c17b28..a96e421c 100644 --- a/packages/db/src/buffers/bot-buffer.ts +++ b/packages/db/src/buffers/bot-buffer.ts @@ -1,13 +1,23 @@ import { TABLE_NAMES, ch } from '../clickhouse-client'; import type { IClickhouseBotEvent } from '../services/event.service'; +import { BotBuffer as NewBotBuffer } from './bot-buffer-psql'; import { RedisBuffer } from './buffer'; +const testNewBotBuffer = new NewBotBuffer(); + type BufferType = IClickhouseBotEvent; export class BotBuffer extends RedisBuffer { constructor() { super('events_bots', 500); } + async add(event: BufferType) { + await super.add(event); + if (process.env.TEST_NEW_BUFFER) { + await testNewBotBuffer.add(event); + } + } + protected async insertIntoDB(items: BufferType[]): Promise { await ch.insert({ table: TABLE_NAMES.events_bots, diff --git a/packages/db/src/buffers/event-buffer-psql.ts b/packages/db/src/buffers/event-buffer-psql.ts index ee98bbfb..7bde8661 100644 --- a/packages/db/src/buffers/event-buffer-psql.ts +++ b/packages/db/src/buffers/event-buffer-psql.ts @@ -4,7 +4,7 @@ import { type ILogger as Logger, createLogger } from '@openpanel/logger'; import { getRedisCache, getRedisPub, runEvery } from '@openpanel/redis'; import { Prisma } from '@prisma/client'; import { ch } from '../clickhouse-client'; -import { db } from '../prisma-client'; +import { type EventBuffer as IPrismaEventBuffer, db } from '../prisma-client'; import { type IClickhouseEvent, type IServiceEvent, @@ -17,6 +17,7 @@ export class EventBuffer { private lockKey = `lock:${this.name}`; private lockTimeout = 60; private daysToKeep = 2; + private batchSize = 1000; constructor() { this.logger = createLogger({ name: this.name }); @@ -35,16 +36,16 @@ export class EventBuffer { }, }); - this.publishEvent('event:received', event); - - if (event.profile_id) { - getRedisCache().set( - `live:event:${event.project_id}:${event.profile_id}`, - '', - 'EX', - 60 * 5, - ); - } + // TODO: UNCOMMENT THIS!!! + // this.publishEvent('event:received', event); + // if (event.profile_id) { + // getRedisCache().set( + // `live:event:${event.project_id}:${event.profile_id}`, + // '', + // 'EX', + // 60 * 5, + // ); + // } } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { if (error.code === 'P2002') { @@ -106,109 +107,78 @@ export class EventBuffer { } async processBuffer() { - const eventsToProcess = await db.$transaction(async (trx) => { - // Process all screen_views that have a next event - const processableViews = await trx.$queryRaw< - Array<{ - id: string; - payload: IClickhouseEvent; - next_event_time: Date; - }> - >` - WITH NextEvents AS ( - SELECT - id, - payload, - LEAD("createdAt") OVER ( - PARTITION BY "sessionId" - ORDER BY "createdAt" - ) as next_event_time - FROM event_buffer - WHERE "name" = 'screen_view' - AND "processedAt" IS NULL + const eventsToProcess = await db.$queryRaw` + WITH has_2_special AS ( + SELECT "sessionId" + FROM event_buffer + WHERE "processedAt" IS NULL + AND name IN ('screen_view', 'session_start', 'session_end') + GROUP BY "sessionId" + HAVING COUNT(*) >= 2 + ) + SELECT * + FROM event_buffer e + WHERE e."processedAt" IS NULL + AND ( + -- 1) if the event name is NOT in the special set + e.name NOT IN ('screen_view', 'session_start', 'session_end') + OR + -- 2) if the event name IS in the special set AND + -- the session has >= 2 such unprocessed events + ( + e.name IN ('screen_view', 'session_start', 'session_end') + AND e."sessionId" IN (SELECT "sessionId" FROM has_2_special) ) - SELECT * - FROM NextEvents - WHERE next_event_time IS NOT NULL - `; + ) + ORDER BY e."createdAt" ASC -- or e.id, whichever "oldest first" logic you use + LIMIT ${this.batchSize} + `; - // Find screen_views that are last in their session with session_end - const lastViews = await trx.$queryRaw< - Array<{ - id: string; - payload: IClickhouseEvent; - }> - >` - WITH LastViews AS ( - SELECT e.id, e.payload, - EXISTS ( - SELECT 1 - FROM event_buffer se - WHERE se."name" = 'session_end' - AND se."sessionId" = e."sessionId" - AND se."createdAt" > e."createdAt" - ) as has_session_end - FROM event_buffer e - WHERE e."name" = 'screen_view' - AND e."processedAt" IS NULL - AND NOT EXISTS ( - SELECT 1 - FROM event_buffer next - WHERE next."sessionId" = e."sessionId" - AND next."name" = 'screen_view' - AND next."createdAt" > e."createdAt" - ) - ) - SELECT * FROM LastViews - WHERE has_session_end = true - `; + const toInsert = eventsToProcess.reduce( + (acc, event, index, list) => { + if (event.name === 'screen_view') { + const nextScreenView = list.find( + (e, eIndex) => + (e.name === 'screen_view' || e.name === 'session_end') && + e.sessionId === event.sessionId && + eIndex > index, + ); - // Get all other events - const regularEvents = await trx.eventBuffer.findMany({ - where: { - processedAt: null, - name: { not: 'screen_view' }, - }, - orderBy: { createdAt: 'asc' }, - }); + if (nextScreenView && nextScreenView.name === 'screen_view') { + event.payload.duration = + new Date(nextScreenView.createdAt).getTime() - + new Date(event.createdAt).getTime(); + } - return { - processableViews, - lastViews, - regularEvents, - }; - }); + // if there is no more screen views nor session_end, + // we don't want to insert this event into clickhouse + if (!nextScreenView) { + return acc; + } + } - const toInsert = [ - ...eventsToProcess.processableViews.map((view) => ({ - ...view.payload, - duration: - new Date(view.next_event_time).getTime() - - new Date(view.payload.created_at).getTime(), - })), - ...eventsToProcess.lastViews.map((v) => v.payload), - ...eventsToProcess.regularEvents.map((e) => e.payload), - ]; + acc.push(event); + + return acc; + }, + [], + ); if (toInsert.length > 0) { await ch.insert({ table: 'events', - values: toInsert, + values: toInsert.map((e) => e.payload), format: 'JSONEachRow', }); for (const event of toInsert) { - this.publishEvent('event:saved', event); + this.publishEvent('event:saved', event.payload); } await db.eventBuffer.updateMany({ where: { id: { - in: [ - ...eventsToProcess.processableViews.map((v) => v.id), - ...eventsToProcess.lastViews.map((v) => v.id), - ...eventsToProcess.regularEvents.map((e) => e.id), - ], + in: toInsert.map((e) => e.id), }, }, data: { @@ -218,10 +188,6 @@ export class EventBuffer { this.logger.info('Processed events', { count: toInsert.length, - screenViews: - eventsToProcess.processableViews.length + - eventsToProcess.lastViews.length, - regularEvents: eventsToProcess.regularEvents.length, }); } } diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 31fc9031..dcadb356 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -16,9 +16,12 @@ import type { } from '../services/event.service'; import type { Find, FindMany } from './buffer'; import { RedisBuffer } from './buffer'; +import { EventBuffer as NewEventBuffer } from './event-buffer-psql'; const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24; +const testNewEventBuffer = new NewEventBuffer(); + type BufferType = IClickhouseEvent; export class EventBuffer extends RedisBuffer { constructor() { @@ -57,6 +60,9 @@ export class EventBuffer extends RedisBuffer { public async add(event: BufferType) { await super.add(event); + if (process.env.TEST_NEW_BUFFER) { + await testNewEventBuffer.add(event); + } if (event.name === 'screen_view') { await getRedisCache().set( this.getLastEventKey({ diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index a8606e63..69909f09 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,6 +1,6 @@ -import { BotBuffer } from './bot-buffer-psql'; -import { EventBuffer } from './event-buffer-psql'; -import { ProfileBuffer } from './profile-buffer-psql'; +import { BotBuffer } from './bot-buffer'; +import { EventBuffer } from './event-buffer'; +import { ProfileBuffer } from './profile-buffer'; export const eventBuffer = new EventBuffer(); export const profileBuffer = new ProfileBuffer(); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 6aa8d494..deb2fe61 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -17,17 +17,26 @@ import type { } from '../services/profile.service'; import type { Find, FindMany } from './buffer'; import { RedisBuffer } from './buffer'; - +import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql'; const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES ? Number.parseInt(process.env.BATCH_SIZE_PROFILES, 10) : 50; +const testNewProfileBuffer = new NewProfileBuffer(); + type BufferType = IClickhouseProfile; export class ProfileBuffer extends RedisBuffer { constructor() { super('profiles', BATCH_SIZE); } + async add(profile: BufferType) { + await super.add(profile); + if (process.env.TEST_NEW_BUFFER) { + await testNewProfileBuffer.add(profile); + } + } + // this will do a couple of things: // - we slice the queue to maxBufferSize since this queries have a limit on character count // - check redis cache for profiles