From 0cf73e299a7fb669b172cefbc243cc3315d33413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 10 Feb 2025 20:24:26 +0100 Subject: [PATCH] chore(root): clean up unused stuff --- apps/api/src/controllers/event.controller.ts | 49 ++- apps/api/src/controllers/track.controller.ts | 49 ++- .../src/jobs/events.create-session-end.ts | 1 - .../20250210192312_clean_up/migration.sql | 40 +++ packages/db/prisma/schema.prisma | 94 ------ packages/db/src/buffers/bot-buffer-psql.ts | 116 ------- packages/db/src/buffers/event-buffer-psql.ts | 307 ------------------ packages/db/src/buffers/index.ts | 12 +- .../db/src/buffers/profile-buffer-psql.ts | 291 ----------------- 9 files changed, 89 insertions(+), 870 deletions(-) create mode 100644 packages/db/prisma/migrations/20250210192312_clean_up/migration.sql delete mode 100644 packages/db/src/buffers/bot-buffer-psql.ts delete mode 100644 packages/db/src/buffers/event-buffer-psql.ts delete mode 100644 packages/db/src/buffers/profile-buffer-psql.ts diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 189c66a5..24e9eb1e 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -49,34 +49,31 @@ export async function postEvent( 'NX', ); - // TODO: remove this - if (process.env.DISABLE_ADD_JOBS === undefined) { - eventsQueue.add( - 'event', - { - type: 'incomingEvent', - payload: { - projectId, - headers: getStringHeaders(request.headers), - event: { - ...request.body, - timestamp: timestamp.timestamp, - isTimestampFromThePast: timestamp.isTimestampFromThePast, - }, - geo, - currentDeviceId, - previousDeviceId, - priority: locked === 'OK', + eventsQueue.add( + 'event', + { + type: 'incomingEvent', + payload: { + projectId, + headers: getStringHeaders(request.headers), + event: { + ...request.body, + timestamp: timestamp.timestamp, + isTimestampFromThePast: timestamp.isTimestampFromThePast, }, + geo, + currentDeviceId, + previousDeviceId, + priority: locked === 'OK', }, - { - // Prioritize 'screen_view' events by setting no delay - // This ensures that session starts are created from 'screen_view' events - // rather than other events, maintaining accurate session tracking - delay: request.body.name === 'screen_view' ? undefined : 1000, - }, - ); - } + }, + { + // Prioritize 'screen_view' events by setting no delay + // This ensures that session starts are created from 'screen_view' events + // rather than other events, maintaining accurate session tracking + delay: request.body.name === 'screen_view' ? undefined : 1000, + }, + ); reply.status(202).send('ok'); } diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index a8bd4afe..3b5acad1 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -247,34 +247,31 @@ async function track({ 'NX', ); - // TODO: remove this - if (process.env.DISABLE_ADD_JOBS === undefined) { - eventsQueue.add( - 'event', - { - type: 'incomingEvent', - payload: { - projectId, - headers, - event: { - ...payload, - timestamp, - isTimestampFromThePast, - }, - geo, - currentDeviceId, - previousDeviceId, - priority: locked === 'OK', + eventsQueue.add( + 'event', + { + type: 'incomingEvent', + payload: { + projectId, + headers, + event: { + ...payload, + timestamp, + isTimestampFromThePast, }, + geo, + currentDeviceId, + previousDeviceId, + priority: locked === 'OK', }, - { - // Prioritize 'screen_view' events by setting no delay - // This ensures that session starts are created from 'screen_view' events - // rather than other events, maintaining accurate session tracking - delay: isScreenView ? undefined : 1000, - }, - ); - } + }, + { + // Prioritize 'screen_view' events by setting no delay + // This ensures that session starts are created from 'screen_view' events + // rather than other events, maintaining accurate session tracking + delay: isScreenView ? undefined : 1000, + }, + ); } async function identify({ diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index f2da9968..bb01b6f5 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -76,7 +76,6 @@ export async function createSessionEnd( const payload = job.data.payload; - // TODO: Get complete session from buffer to offload clickhouse const [lastScreenView, eventsInDb] = await Promise.all([ eventBuffer.getLastScreenView({ projectId: payload.projectId, diff --git a/packages/db/prisma/migrations/20250210192312_clean_up/migration.sql b/packages/db/prisma/migrations/20250210192312_clean_up/migration.sql new file mode 100644 index 00000000..1cd24ed8 --- /dev/null +++ b/packages/db/prisma/migrations/20250210192312_clean_up/migration.sql @@ -0,0 +1,40 @@ +/* + Warnings: + + - You are about to drop the column `cors` on the `clients` table. All the data in the column will be lost. + - You are about to drop the column `crossDomain` on the `clients` table. All the data in the column will be lost. + - You are about to drop the `bot_event_buffer` table. If the table is not empty, all the data it contains will be lost. + - You are about to drop the `event_buffer` table. If the table is not empty, all the data it contains will be lost. + - You are about to drop the `events` table. If the table is not empty, all the data it contains will be lost. + - You are about to drop the `profile_buffer` table. If the table is not empty, all the data it contains will be lost. + - You are about to drop the `profiles` table. If the table is not empty, all the data it contains will be lost. + - You are about to drop the `waitlist` table. If the table is not empty, all the data it contains will be lost. + +*/ +-- DropForeignKey +ALTER TABLE "events" DROP CONSTRAINT "events_projectId_fkey"; + +-- DropForeignKey +ALTER TABLE "profiles" DROP CONSTRAINT "profiles_projectId_fkey"; + +-- AlterTable +ALTER TABLE "clients" DROP COLUMN "cors", +DROP COLUMN "crossDomain"; + +-- DropTable +DROP TABLE "bot_event_buffer"; + +-- DropTable +DROP TABLE "event_buffer"; + +-- DropTable +DROP TABLE "events"; + +-- DropTable +DROP TABLE "profile_buffer"; + +-- DropTable +DROP TABLE "profiles"; + +-- DropTable +DROP TABLE "waitlist"; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index a21f842c..e893ac98 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -146,8 +146,6 @@ model Project { /// [IPrismaProjectFilters] filters Json @default("[]") - events Event[] - profiles Profile[] clients Client[] reports Report[] dashboards Dashboard[] @@ -186,21 +184,6 @@ model ProjectAccess { @@map("project_access") } -model Event { - id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid - name String - properties Json - projectId String - project Project @relation(fields: [projectId], references: [id]) - - profileId String? - - createdAt DateTime @default(now()) - updatedAt DateTime @default(now()) @updatedAt - - @@map("events") -} - model Salt { salt String @id createdAt DateTime @default(now()) @@ -209,22 +192,6 @@ model Salt { @@map("salts") } -model Profile { - id String @id - externalId String? - firstName String? - lastName String? - email String? - avatar String? - properties Json - projectId String - project Project @relation(fields: [projectId], references: [id]) - createdAt DateTime @default(now()) - updatedAt DateTime @default(now()) @updatedAt - - @@map("profiles") -} - enum ClientType { read write @@ -240,8 +207,6 @@ model Client { project Project? @relation(fields: [projectId], references: [id]) organization Organization @relation(fields: [organizationId], references: [id]) organizationId String - cors String? - crossDomain Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @default(now()) @updatedAt @@ -319,16 +284,6 @@ model Report { @@map("reports") } -model Waitlist { - id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid - email String @unique - createdAt DateTime @default(now()) - updatedAt DateTime @default(now()) @updatedAt - accepted Boolean @default(false) - - @@map("waitlist") -} - model ShareOverview { id String @unique projectId String @unique @@ -443,52 +398,3 @@ model ResetPassword { @@map("reset_password") } - -model EventBuffer { - id String @id @default(cuid()) - projectId String - eventId String @unique - name String - profileId String? - sessionId String? - /// [IPrismaClickhouseEvent] - payload Json - processedAt DateTime? - createdAt DateTime @default(now()) - updatedAt DateTime @default(now()) @updatedAt - - @@index([projectId, processedAt, createdAt]) - @@index([projectId, profileId, sessionId, createdAt]) - @@map("event_buffer") -} - -model ProfileBuffer { - id String @id @default(cuid()) - projectId String - profileId String - checksum String - /// [IPrismaClickhouseProfile] - payload Json - processedAt DateTime? - createdAt DateTime @default(now()) - updatedAt DateTime @default(now()) @updatedAt - - @@index([projectId, profileId]) - @@index([projectId, processedAt]) - @@index([checksum]) - @@map("profile_buffer") -} - -model BotEventBuffer { - id String @id @default(cuid()) - projectId String - eventId String - /// [IPrismaClickhouseBotEvent] - payload Json - createdAt DateTime @default(now()) - processedAt DateTime? - - @@index([processedAt]) - @@index([projectId, eventId]) - @@map("bot_event_buffer") -} diff --git a/packages/db/src/buffers/bot-buffer-psql.ts b/packages/db/src/buffers/bot-buffer-psql.ts deleted file mode 100644 index 40a5e135..00000000 --- a/packages/db/src/buffers/bot-buffer-psql.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { runEvery } from '@openpanel/redis'; - -import { TABLE_NAMES, ch } from '../clickhouse-client'; -import { db } from '../prisma-client'; -import type { IClickhouseBotEvent } from '../services/event.service'; -import { BaseBuffer } from './base-buffer'; - -export class BotBuffer extends BaseBuffer { - private batchSize = process.env.BOT_BUFFER_BATCH_SIZE - ? Number.parseInt(process.env.BOT_BUFFER_BATCH_SIZE, 10) - : 1000; - - constructor() { - super({ - name: 'bot', - onFlush: async () => { - await this.processBuffer(); - await this.tryCleanup(); - }, - }); - } - - async add(event: IClickhouseBotEvent) { - try { - await db.botEventBuffer.create({ - data: { - projectId: event.project_id, - eventId: event.id, - payload: event, - }, - }); - - // Check if we have enough unprocessed events to trigger a flush - const unprocessedCount = await db.botEventBuffer.count({ - where: { - processedAt: null, - }, - }); - - if (unprocessedCount >= this.batchSize && !process.env.TEST_NEW_BUFFER) { - await this.tryFlush(); - } - } catch (error) { - this.logger.error('Failed to add bot event', { error }); - } - } - - async processBuffer() { - const eventsToProcess = await db.botEventBuffer.findMany({ - where: { - processedAt: null, - }, - orderBy: { - createdAt: 'asc', - }, - take: this.batchSize, - }); - - if (eventsToProcess.length > 0) { - const toInsert = eventsToProcess.map((e) => e.payload); - - await ch.insert({ - table: TABLE_NAMES.events_bots, - values: toInsert, - format: 'JSONEachRow', - }); - - await db.botEventBuffer.updateMany({ - where: { - id: { - in: eventsToProcess.map((e) => e.id), - }, - }, - data: { - processedAt: new Date(), - }, - }); - - this.logger.info('Processed bot events', { - count: toInsert.length, - }); - } - } - - async tryCleanup() { - try { - await runEvery({ - interval: 60 * 5, // 5 minutes - fn: this.cleanup.bind(this), - key: `${this.name}-cleanup`, - }); - } catch (error) { - this.logger.error('Failed to run cleanup', { error }); - } - } - - async cleanup() { - const deleted = await db.botEventBuffer.deleteMany({ - where: { - processedAt: { - not: null, - }, - }, - }); - - this.logger.info('Cleaned up old bot events', { deleted: deleted.count }); - } - - public async getBufferSize() { - return db.botEventBuffer.count({ - where: { - processedAt: null, - }, - }); - } -} diff --git a/packages/db/src/buffers/event-buffer-psql.ts b/packages/db/src/buffers/event-buffer-psql.ts deleted file mode 100644 index a1b9f69b..00000000 --- a/packages/db/src/buffers/event-buffer-psql.ts +++ /dev/null @@ -1,307 +0,0 @@ -import { getSafeJson, setSuperJson } from '@openpanel/common'; -import { getRedisCache, getRedisPub, runEvery } from '@openpanel/redis'; -import { Prisma } from '@prisma/client'; -import { ch } from '../clickhouse-client'; -import { type EventBuffer as IPrismaEventBuffer, db } from '../prisma-client'; -import { - type IClickhouseEvent, - type IServiceEvent, - transformEvent, -} from '../services/event.service'; -import { BaseBuffer } from './base-buffer'; - -export class EventBuffer extends BaseBuffer { - private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP - ? Number.parseInt(process.env.EVENT_BUFFER_DAYS_TO_KEEP, 10) - : 3; - private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE - ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) - : 2000; - private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE - ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) - : 1000; - - constructor() { - super({ - name: 'event', - onFlush: async () => { - await this.processBuffer(); - await this.tryCleanup(); - }, - }); - } - - async add(event: IClickhouseEvent) { - try { - await db.eventBuffer.create({ - data: { - projectId: event.project_id, - eventId: event.id, - name: event.name, - profileId: event.profile_id || null, - sessionId: event.session_id || null, - payload: event, - }, - }); - - if (event.name === 'screen_view') { - await getRedisCache().set( - this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }), - JSON.stringify(event), - 'EX', - 60 * 31, - ); - } - - if (!process.env.TEST_NEW_BUFFER) { - this.publishEvent('event:received', event); - if (event.profile_id) { - getRedisCache().set( - `live:event:${event.project_id}:${event.profile_id}`, - '', - 'EX', - 60 * 5, - ); - } - } - } catch (error) { - if (error instanceof Prisma.PrismaClientKnownRequestError) { - if (error.code === 'P2002') { - this.logger.warn('Duplicate event ignored', { eventId: event.id }); - return; - } - } - this.logger.error('Failed to add event', { error }); - } - } - - private async publishEvent(channel: string, event: IClickhouseEvent) { - try { - await getRedisPub().publish( - channel, - setSuperJson( - transformEvent(event) as unknown as Record, - ), - ); - } catch (error) { - this.logger.warn('Failed to publish event', { error }); - } - } - - async processBuffer() { - let now = performance.now(); - const timer: Record = { - fetchUnprocessedEvents: undefined, - transformEvents: undefined, - insertToClickhouse: undefined, - markAsProcessed: undefined, - }; - const eventsToProcess = await db.$queryRaw` - WITH has_more_than_2_events AS ( - SELECT "sessionId" - FROM event_buffer - WHERE "processedAt" IS NULL - GROUP BY "sessionId" - HAVING COUNT(*) >= 2 - ) - SELECT * - FROM event_buffer e - WHERE e."processedAt" IS NULL - AND ( - -- 1) all events except screen_view - e.name != 'screen_view' - OR - -- 2) if the session has >= 2 such unprocessed events - e."sessionId" IN (SELECT "sessionId" FROM has_more_than_2_events) - ) - ORDER BY e."createdAt" ASC - LIMIT ${this.batchSize} - `; - - timer.fetchUnprocessedEvents = performance.now() - now; - now = performance.now(); - - const toInsert = eventsToProcess.reduce( - (acc, event, index, list) => { - // SCREEN VIEW - if (event.name === 'screen_view') { - const nextScreenView = list - .slice(index + 1) - .find( - (e) => - (e.name === 'screen_view' || e.name === 'session_end') && - e.sessionId === event.sessionId, - ); - - // Calculate duration - if (nextScreenView && nextScreenView.name === 'screen_view') { - event.payload.duration = - new Date(nextScreenView.createdAt).getTime() - - new Date(event.createdAt).getTime(); - } - - // if there is no more screen views nor session_end, - // we don't want to insert this event into clickhouse - if (!nextScreenView) { - return acc; - } - } else { - // OTHER EVENTS - const currentScreenView = list - .slice(0, index) - .findLast( - (e) => - e.name === 'screen_view' && e.sessionId === event.sessionId, - ); - - if (currentScreenView) { - // Get path related info from the current screen view - event.payload.path = currentScreenView.payload.path; - event.payload.origin = currentScreenView.payload.origin; - } - } - - acc.push(event); - - return acc; - }, - [], - ); - - timer.transformEvents = performance.now() - now; - now = performance.now(); - - if (toInsert.length > 0) { - const events = toInsert.map((e) => e.payload); - for (const chunk of this.chunks(events, this.chunkSize)) { - await ch.insert({ - table: 'events', - values: chunk, - format: 'JSONEachRow', - }); - } - - timer.insertToClickhouse = performance.now() - now; - now = performance.now(); - - for (const event of toInsert) { - this.publishEvent('event:saved', event.payload); - } - - timer.markAsProcessed = performance.now() - now; - now = performance.now(); - - await db.eventBuffer.updateMany({ - where: { - id: { - in: toInsert.map((e) => e.id), - }, - }, - data: { - processedAt: new Date(), - }, - }); - - timer.markAsProcessed = performance.now() - now; - - this.logger.info('Processed events', { - inserted: toInsert.length, - processed: eventsToProcess.length, - timer, - }); - } - } - - async tryCleanup() { - try { - await runEvery({ - interval: 60 * 5, // 5 minutes - fn: this.cleanup.bind(this), - key: `${this.name}-cleanup`, - }); - } catch (error) { - this.logger.error('Failed to run cleanup', { error }); - } - } - - async cleanup() { - const olderThan = new Date(); - olderThan.setDate(olderThan.getDate() - this.daysToKeep); - - const deleted = await db.$executeRaw` - DELETE FROM event_buffer - WHERE - -- 1) if the event has been processed - -- and session is completed or has no session - ( - "processedAt" IS NOT NULL - AND ( - "sessionId" IN (SELECT "sessionId" FROM event_buffer WHERE name = 'session_end') - OR "sessionId" IS NULL - ) - ) - -- 2) if the event is stalled for X days - OR "createdAt" < ${olderThan} - `; - - this.logger.info('Cleaned up old events', { deleted }); - } - - public async getLastScreenView({ - projectId, - profileId, - }: { - projectId: string; - profileId: string; - }): Promise { - // const event = await db.$primary().eventBuffer.findFirst({ - // where: { - // projectId, - // profileId, - // name: 'screen_view', - // }, - // orderBy: { createdAt: 'desc' }, - // select: { - // payload: true, - // }, - // }); - - // if (event) { - // return transformEvent(event.payload); - // } - - // return null; - const event = await getRedisCache().get( - this.getLastEventKey({ projectId, profileId }), - ); - - if (event) { - const parsed = getSafeJson(event); - if (parsed) { - return transformEvent(parsed); - } - } - return null; - } - - getLastEventKey({ - projectId, - profileId, - }: { - projectId: string; - profileId: string; - }) { - return `session:last_screen_view:${projectId}:${profileId}`; - } - - async getBufferSize() { - return db.eventBuffer.count({ - where: { - processedAt: null, - }, - }); - } -} diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 41536cce..88f30de7 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -5,12 +5,6 @@ import { EventBuffer as EventBufferRedis } from './event-buffer-redis'; import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis'; -export const eventBuffer = process.env.USE_NEW_BUFFER - ? new EventBufferRedis() - : new EventBufferPsql(); -export const profileBuffer = process.env.USE_NEW_BUFFER - ? new ProfileBufferRedis() - : new ProfileBufferPsql(); -export const botBuffer = process.env.USE_NEW_BUFFER - ? new BotBufferRedis() - : new BotBufferPsql(); +export const eventBuffer = new EventBufferRedis(); +export const profileBuffer = new ProfileBufferRedis(); +export const botBuffer = new BotBufferRedis(); diff --git a/packages/db/src/buffers/profile-buffer-psql.ts b/packages/db/src/buffers/profile-buffer-psql.ts deleted file mode 100644 index 3163bd89..00000000 --- a/packages/db/src/buffers/profile-buffer-psql.ts +++ /dev/null @@ -1,291 +0,0 @@ -import { createHash } from 'node:crypto'; -import { runEvery } from '@openpanel/redis'; -import { assocPath, dissocPath, mergeDeepRight } from 'ramda'; - -import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client'; -import { db } from '../prisma-client'; -import type { IClickhouseProfile } from '../services/profile.service'; -import { BaseBuffer } from './base-buffer'; - -export class ProfileBuffer extends BaseBuffer { - private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP - ? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10) - : 7; - private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE - ? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10) - : 2000; - private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE - ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) - : 1000; - - constructor() { - super({ - name: 'profile', - onFlush: async () => { - await this.processBuffer(); - await this.tryCleanup(); - }, - }); - } - - private sortObjectKeys(obj: any, exclude: string[][] = []): any { - // Cache typeof check result - const type = typeof obj; - - // Fast-path for primitives - if (obj === null || type !== 'object') { - return String(obj); - } - - // Fast-path for arrays - process values only - if (Array.isArray(obj)) { - // Only map if contains objects - return obj.some((item) => item && typeof item === 'object') - ? obj.map((item) => this.sortObjectKeys(item)) - : obj; - } - - // Get and sort keys once - const sortedKeys = Object.keys(obj).sort(); - const len = sortedKeys.length; - - // Pre-allocate result object - const result: any = {}; - - // Single loop with cached length - for (let i = 0; i < len; i++) { - const key = sortedKeys[i]!; - const value = obj[key]; - result[key] = - value && typeof value === 'object' - ? this.sortObjectKeys(value) - : String(value); - } - - return result; - } - - private excludeKeys( - profile: IClickhouseProfile, - exclude: string[][], - ): IClickhouseProfile { - let filtered = profile; - for (const path of exclude) { - filtered = dissocPath(path, filtered); - } - return filtered; - } - - private stringify(profile: IClickhouseProfile): string { - const exclude = [ - ['created_at'], - ['properties', 'brand'], - ['properties', 'browser_version'], - ['properties', 'browserVersion'], - ['properties', 'browser'], - ['properties', 'city'], - ['properties', 'country'], - ['properties', 'device'], - ['properties', 'latitude'], - ['properties', 'longitude'], - ['properties', 'model'], - ['properties', 'os_version'], - ['properties', 'osVersion'], - ['properties', 'os'], - ['properties', 'path'], - ['properties', 'referrer_name'], - ['properties', 'referrerName'], - ['properties', 'referrer_type'], - ['properties', 'referrerType'], - ['properties', 'referrer'], - ['properties', 'region'], - ]; - const excluded = this.excludeKeys(profile, exclude); - const sorted = this.sortObjectKeys(excluded); - return JSON.stringify(sorted); - } - - private generateChecksum(profile: IClickhouseProfile): string { - const json = this.stringify(profile); - return createHash('sha256').update(json).digest('hex'); - } - - async add(profile: IClickhouseProfile) { - try { - const checksum = this.generateChecksum(profile); - // Check if we have this exact profile in buffer - const existingProfile = await db.profileBuffer.findFirst({ - where: { - projectId: profile.project_id, - profileId: profile.id, - }, - orderBy: { - createdAt: 'desc', - }, - select: { - checksum: true, - payload: true, - id: true, - }, - }); - - // Last item in buffer is the same as the new profile - if (existingProfile?.checksum === checksum) { - this.logger.debug('Duplicate profile ignored', { - profileId: profile.id, - }); - return; - } - - let mergedProfile = profile; - - if (!existingProfile) { - this.logger.debug('No profile in buffer, checking Clickhouse', { - profileId: profile.id, - }); - // If not in buffer, check Clickhouse - const clickhouseProfile = await this.fetchFromClickhouse(profile); - if (clickhouseProfile) { - this.logger.debug('Clickhouse profile found, merging', { - profileId: profile.id, - }); - mergedProfile = mergeDeepRight(clickhouseProfile, profile); - } - } else if (existingProfile.payload) { - this.logger.debug('Profile in buffer is different, merging', { - profileId: profile.id, - existingProfile: existingProfile.payload, - existingProfileChecksum: existingProfile.checksum, - incomingProfile: profile, - incomingProfileChecksum: checksum, - }); - mergedProfile = mergeDeepRight(existingProfile.payload, profile); - } - - // Update existing profile if its not processed yet - if (existingProfile) { - await db.profileBuffer.update({ - where: { - id: existingProfile.id, - }, - data: { - checksum: this.generateChecksum(mergedProfile), - payload: mergedProfile, - updatedAt: new Date(), - processedAt: null, - }, - }); - } else { - // Create new profile - await db.profileBuffer.create({ - data: { - projectId: profile.project_id, - profileId: profile.id, - checksum, - payload: mergedProfile, - }, - }); - } - } catch (error) { - this.logger.error('Failed to add profile', { error }); - } - } - - private async fetchFromClickhouse( - profile: IClickhouseProfile, - ): Promise { - const result = await chQuery( - `SELECT * - FROM ${TABLE_NAMES.profiles} - WHERE project_id = '${profile.project_id}' - AND id = '${profile.id}' - ${ - // If profile is not external, we know its not older than 2 day - profile.is_external === false - ? 'AND created_at > now() - INTERVAL 2 DAY' - : '' - } - ORDER BY created_at DESC - LIMIT 1`, - ); - - return result[0] || null; - } - - async processBuffer() { - const profilesToProcess = await db.profileBuffer.findMany({ - where: { - processedAt: null, - }, - orderBy: { - createdAt: 'asc', - }, - take: this.batchSize, - }); - - if (profilesToProcess.length > 0) { - const toInsert = profilesToProcess.map((p) => { - const profile = p.payload; - return profile; - }); - - for (const chunk of this.chunks(profilesToProcess, this.chunkSize)) { - await ch.insert({ - table: TABLE_NAMES.profiles, - values: chunk, - format: 'JSONEachRow', - }); - } - - await db.profileBuffer.updateMany({ - where: { - id: { - in: profilesToProcess.map((p) => p.id), - }, - }, - data: { - processedAt: new Date(), - }, - }); - - this.logger.info('Processed profiles', { - count: toInsert.length, - }); - } - } - - async tryCleanup() { - try { - await runEvery({ - interval: 60 * 60, // 1 hour - fn: this.cleanup.bind(this), - key: `${this.name}-cleanup`, - }); - } catch (error) { - this.logger.error('Failed to run cleanup', { error }); - } - } - - async cleanup() { - const olderThan = new Date(); - olderThan.setDate(olderThan.getDate() - this.daysToKeep); - - const deleted = await db.profileBuffer.deleteMany({ - where: { - processedAt: { - lt: olderThan, - }, - }, - }); - - this.logger.info('Cleaned up old profiles', { deleted: deleted.count }); - } - - async getBufferSize() { - return db.profileBuffer.count({ - where: { - processedAt: null, - }, - }); - } -}