diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 167941fe..53893315 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -6,6 +6,7 @@ import { Worker } from 'bullmq'; import express from 'express'; import { createInitialSalts } from '@openpanel/db'; +import type { CronQueueType } from '@openpanel/queue'; import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; @@ -137,70 +138,75 @@ async function start() { }), ); - await cronQueue.add( - 'salt', + const jobs: { + name: string; + type: CronQueueType; + pattern: string | number; + }[] = [ { + name: 'salt', type: 'salt', - payload: undefined, + pattern: '0 0 * * *', }, { - jobId: 'salt', - repeat: { - utc: true, - pattern: '0 0 * * *', - }, - }, - ); - - await cronQueue.add( - 'flush', - { + name: 'flush', type: 'flushEvents', - payload: undefined, + pattern: 1000 * 10, }, { - jobId: 'flushEvents', - repeat: { - every: process.env.BATCH_INTERVAL - ? Number.parseInt(process.env.BATCH_INTERVAL, 10) - : 1000 * 10, - }, - }, - ); - - await cronQueue.add( - 'flush', - { + name: 'flush', type: 'flushProfiles', - payload: undefined, + pattern: 1000 * 60 * 30, }, - { - jobId: 'flushProfiles', - repeat: { - every: 2 * 1000, - }, - }, - ); + ]; if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') { + jobs.push({ + name: 'ping', + type: 'ping', + pattern: '0 0 * * *', + }); + } + + // Add repeatable jobs + for (const job of jobs) { await cronQueue.add( - 'ping', + job.name, { - type: 'ping', + type: job.type, payload: undefined, }, { - jobId: 'ping', - repeat: { - pattern: '0 0 * * *', - }, + jobId: job.type, + repeat: + typeof job.pattern === 'number' + ? { + every: job.pattern, + } + : { + pattern: job.pattern, + }, }, ); } + // Remove outdated repeatable jobs const repeatableJobs = await cronQueue.getRepeatableJobs(); - - logger.info('Repeatable jobs', { repeatableJobs }); + for (const repeatableJob of repeatableJobs) { + const match = jobs.find( + (job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key, + ); + if (match) { + logger.info('Repeatable job exists', { + key: repeatableJob.key, + }); + } else { + logger.info('Removing repeatable job', { + key: repeatableJob.key, + }); + cronQueue.removeRepeatableByKey(repeatableJob.key); + } + } await createInitialSalts(); } diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index a12e0f3d..5c0057cc 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -12,10 +12,10 @@ export async function cronJob(job: Job) { return await salt(); } case 'flushEvents': { - return await eventBuffer.flush(); + return await eventBuffer.tryFlush(); } case 'flushProfiles': { - return await profileBuffer.flush(); + return await profileBuffer.tryFlush(); } case 'ping': { return await ping(); diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index 6dc2b372..a095a387 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -15,7 +15,7 @@ export async function createSessionEnd( ) { const payload = job.data.payload; const eventsInBuffer = await eventBuffer.findMany( - (item) => item.event.session_id === payload.sessionId, + (item) => item.session_id === payload.sessionId, ); const sql = ` diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index ab030382..93a8093b 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -9,12 +9,14 @@ import type { IServiceCreateEventPayload } from '@openpanel/db'; import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; import { findJobByPrefix, sessionsQueue } from '@openpanel/queue'; -import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import type { + EventsQueuePayloadCreateSessionEnd, + EventsQueuePayloadIncomingEvent, +} from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; -const SESSION_TIMEOUT = 1000 * 60 * 30; -const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000; +export const SESSION_TIMEOUT = 1000 * 60 * 30; export async function incomingEvent(job: Job) { const { @@ -100,37 +102,14 @@ export async function incomingEvent(job: Job) { previousDeviceId, }); - const sessionEndPayload = sessionEnd?.job.data.payload || { - sessionId: uuid(), - deviceId: currentDeviceId, - profileId, - projectId, - }; - - const sessionEndJobId = - sessionEnd?.job.id ?? - `sessionEnd:${projectId}:${sessionEndPayload.deviceId}:${Date.now()}`; - - if (sessionEnd) { - // If for some reason we have a session end job that is not a createSessionEnd job - if (sessionEnd.job.data.type !== 'createSessionEnd') { - throw new Error('Invalid session end job'); - } - - await sessionEnd.job.changeDelay(SESSION_TIMEOUT); - } else { - await sessionsQueue.add( - 'session', - { - type: 'createSessionEnd', - payload: sessionEndPayload, - }, - { - delay: SESSION_END_TIMEOUT, - jobId: sessionEndJobId, - }, - ); - } + const sessionEndPayload = + sessionEnd?.job.data.payload || + ({ + sessionId: uuid(), + deviceId: currentDeviceId, + profileId, + projectId, + } satisfies EventsQueuePayloadCreateSessionEnd['payload']); const payload: IServiceCreateEventPayload = { name: body.name, @@ -158,13 +137,46 @@ export async function incomingEvent(job: Job) { duration: 0, path: path, origin: origin, - referrer: referrer?.url, - referrerName: referrer?.name || utmReferrer?.name || '', - referrerType: referrer?.type || utmReferrer?.type || '', + referrer: sessionEndPayload.referrer || referrer?.url, + referrerName: + sessionEndPayload.referrerName || + referrer?.name || + utmReferrer?.name || + '', + referrerType: + sessionEndPayload.referrerType || + referrer?.type || + utmReferrer?.type || + '', sdkName, sdkVersion, }; + const sessionEndJobId = + sessionEnd?.job.id ?? + `sessionEnd:${projectId}:${sessionEndPayload.deviceId}:${getTime(createdAt)}`; + + if (sessionEnd) { + // If for some reason we have a session end job that is not a createSessionEnd job + if (sessionEnd.job.data.type !== 'createSessionEnd') { + throw new Error('Invalid session end job'); + } + + await sessionEnd.job.changeDelay(SESSION_TIMEOUT); + } else { + await sessionsQueue.add( + 'session', + { + type: 'createSessionEnd', + payload, + }, + { + delay: SESSION_TIMEOUT, + jobId: sessionEndJobId, + }, + ); + } + if (!sessionEnd) { await createEvent({ ...payload, diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts new file mode 100644 index 00000000..aab2733c --- /dev/null +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -0,0 +1,351 @@ +import { type Mock, beforeEach, describe, expect, it, mock } from 'bun:test'; +import { getTime, toISOString } from '@openpanel/common'; +import type { Job } from 'bullmq'; +import { SESSION_TIMEOUT, incomingEvent } from './events.incoming-event'; + +const projectId = 'test-project'; +const currentDeviceId = 'device-123'; +const previousDeviceId = 'device-456'; +const geo = { + country: 'US', + city: 'New York', + region: 'NY', + longitude: 0, + latitude: 0, +}; + +const createEvent = mock(() => {}); +const getLastScreenViewFromProfileId = mock(); +// // Mock dependencies +mock.module('@openpanel/db', () => ({ + createEvent, + getLastScreenViewFromProfileId, +})); + +const sessionsQueue = { add: mock(() => Promise.resolve({})) }; + +const findJobByPrefix = mock(); + +mock.module('@openpanel/queue', () => ({ + sessionsQueue, + findJobByPrefix, +})); + +const getRedisQueue = mock(() => ({ + keys: mock(() => Promise.resolve([])), +})); + +mock.module('@openpanel/redis', () => ({ + getRedisQueue, +})); + +describe('incomingEvent', () => { + beforeEach(() => { + createEvent.mockClear(); + findJobByPrefix.mockClear(); + sessionsQueue.add.mockClear(); + getLastScreenViewFromProfileId.mockClear(); + }); + + it('should create a session start and an event', async () => { + const timestamp = new Date(); + // Mock job data + const jobData = { + payload: { + geo, + event: { + name: 'test_event', + timestamp: timestamp.toISOString(), + properties: { __path: 'https://example.com/test' }, + }, + headers: { + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + currentDeviceId, + previousDeviceId, + priority: true, + }, + }; + + const job = { data: jobData } as Job; + + // Execute the job + await incomingEvent(job); + + const event = { + name: 'test_event', + deviceId: currentDeviceId, + // @ts-expect-error + sessionId: createEvent.mock.calls[1][0].sessionId, + profileId: '', + projectId, + properties: { + __hash: undefined, + __query: undefined, + }, + createdAt: timestamp, + country: 'US', + city: 'New York', + region: 'NY', + longitude: 0, + latitude: 0, + os: 'Windows', + osVersion: '10', + browser: 'Chrome', + browserVersion: '91.0.4472.124', + device: 'desktop', + brand: '', + model: '', + duration: 0, + path: '/test', + origin: 'https://example.com', + referrer: '', + referrerName: '', + referrerType: 'unknown', + sdkName: 'web', + sdkVersion: '1.0.0', + }; + + expect(sessionsQueue.add.mock.calls[0]).toMatchObject([ + 'session', + { + type: 'createSessionEnd', + payload: event, + }, + { + delay: SESSION_TIMEOUT, + jobId: `sessionEnd:${projectId}:${event.deviceId}:${timestamp.getTime()}`, + }, + ]); + + // Assertions + // Issue: https://github.com/oven-sh/bun/issues/10380 + // expect(createEvent).toHaveBeenCalledWith(...) + expect(createEvent.mock.calls[0]).toMatchObject([ + { + name: 'session_start', + deviceId: currentDeviceId, + sessionId: expect.stringMatching( + /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i, + ), + profileId: '', + projectId, + properties: { + __hash: undefined, + __query: undefined, + }, + createdAt: new Date(timestamp.getTime() - 100), + country: 'US', + city: 'New York', + region: 'NY', + longitude: 0, + latitude: 0, + os: 'Windows', + osVersion: '10', + browser: 'Chrome', + browserVersion: '91.0.4472.124', + device: 'desktop', + brand: '', + model: '', + duration: 0, + path: '/test', + origin: 'https://example.com', + referrer: '', + referrerName: '', + referrerType: 'unknown', + sdkName: 'web', + sdkVersion: '1.0.0', + }, + ]); + expect(createEvent.mock.calls[1]).toMatchObject([event]); + + // Add more specific assertions based on the expected behavior + }); + + it('should reuse existing session', async () => { + // Mock job data + const jobData = { + payload: { + geo, + event: { + name: 'test_event', + timestamp: new Date().toISOString(), + properties: { __path: 'https://example.com/test' }, + }, + headers: { + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + currentDeviceId, + previousDeviceId, + priority: false, + }, + }; + const changeDelay = mock(); + findJobByPrefix.mockReturnValueOnce({ + changeDelay, + data: { + type: 'createSessionEnd', + payload: { + sessionId: 'session-123', + deviceId: currentDeviceId, + profileId: currentDeviceId, + projectId, + }, + }, + }); + + const job = { data: jobData } as Job; + + // Execute the job + await incomingEvent(job); + + expect(changeDelay.mock.calls[0]).toMatchObject([SESSION_TIMEOUT]); + + // Assertions + // Issue: https://github.com/oven-sh/bun/issues/10380 + // expect(createEvent).toHaveBeenCalledWith(...) + expect(createEvent.mock.calls[0]).toMatchObject([ + { + name: 'test_event', + deviceId: currentDeviceId, + profileId: '', + sessionId: 'session-123', + projectId, + properties: { + __hash: undefined, + __query: undefined, + }, + createdAt: expect.any(Date), + country: 'US', + city: 'New York', + region: 'NY', + longitude: 0, + latitude: 0, + os: 'Windows', + osVersion: '10', + browser: 'Chrome', + browserVersion: '91.0.4472.124', + device: 'desktop', + brand: '', + model: '', + duration: 0, + path: '/test', + origin: 'https://example.com', + referrer: '', + referrerName: '', + referrerType: 'unknown', + sdkName: 'web', + sdkVersion: '1.0.0', + }, + ]); + + // Add more specific assertions based on the expected behavior + }); + + it('should handle server events', async () => { + const timestamp = new Date(); + const jobData = { + payload: { + geo, + event: { + name: 'server_event', + timestamp: timestamp.toISOString(), + properties: { custom_property: 'test_value' }, + profileId: 'profile-123', + }, + headers: { + 'user-agent': 'OpenPanel Server/1.0', + 'openpanel-sdk-name': 'server', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + currentDeviceId: '', + previousDeviceId: '', + priority: true, + }, + }; + + const job = { data: jobData } as Job; + + const mockLastScreenView = { + deviceId: 'last-device-123', + sessionId: 'last-session-456', + country: 'CA', + city: 'Toronto', + region: 'ON', + os: 'iOS', + osVersion: '15.0', + browser: 'Safari', + browserVersion: '15.0', + device: 'mobile', + brand: 'Apple', + model: 'iPhone', + path: '/last-path', + origin: 'https://example.com', + referrer: 'https://google.com', + referrerName: 'Google', + referrerType: 'search', + }; + + getLastScreenViewFromProfileId.mockReturnValueOnce(mockLastScreenView); + + await incomingEvent(job); + + // expect(getLastScreenViewFromProfileId).toHaveBeenCalledWith({ + // profileId: 'profile-123', + // projectId, + // }); + + expect(createEvent.mock.calls[0]).toMatchObject([ + { + name: 'server_event', + deviceId: 'last-device-123', + sessionId: 'last-session-456', + profileId: 'profile-123', + projectId, + properties: { + custom_property: 'test_value', + user_agent: 'OpenPanel Server/1.0', + }, + createdAt: timestamp, + country: 'CA', + city: 'Toronto', + region: 'ON', + longitude: 0, + latitude: 0, + os: 'iOS', + osVersion: '15.0', + browser: 'Safari', + browserVersion: '15.0', + device: 'mobile', + brand: 'Apple', + model: 'iPhone', + duration: 0, + path: '/last-path', + origin: 'https://example.com', + referrer: 'https://google.com', + referrerName: 'Google', + referrerType: 'search', + sdkName: 'server', + sdkVersion: '1.0.0', + }, + ]); + + expect(sessionsQueue.add).not.toHaveBeenCalled(); + expect(findJobByPrefix).not.toHaveBeenCalled(); + }); + + // Add more test cases for different scenarios: + // - Server events + // - Existing sessions + // - Different priorities + // - Error cases +}); diff --git a/apps/worker/src/utils/parse-user-agent.ts b/apps/worker/src/utils/parse-user-agent.ts index b7e8666d..42ddc8bb 100644 --- a/apps/worker/src/utils/parse-user-agent.ts +++ b/apps/worker/src/utils/parse-user-agent.ts @@ -9,7 +9,9 @@ export function parseUserAgent(ua?: string | null) { if (!ua) return parsedServerUa; const res = new UAParser(ua).getResult(); - if (isServer(ua)) return parsedServerUa; + if (isServer(ua)) { + return parsedServerUa; + } return { os: res.os.name, @@ -77,7 +79,9 @@ function isServer(userAgent: string) { return true; } - return !!userAgent.match(/^([^\s]+\/[\d.]+\s*)+$/); + // Matches user agents like "Go-http-client/1.0" or "Go Http Client/1.0" + // It should just match the first name (with optional spaces) and version + return !!userAgent.match(/^[^\/]+\/[\d.]+$/); } export function getDevice(ua: string) { diff --git a/packages/common/src/url.ts b/packages/common/src/url.ts index 43d31ef0..1ae1358d 100644 --- a/packages/common/src/url.ts +++ b/packages/common/src/url.ts @@ -22,12 +22,13 @@ export function parsePath(path?: string): { } try { - const url = new URL(path); + const hasOrigin = path.startsWith('http'); + const url = new URL(path, hasOrigin ? undefined : 'http://localhost'); return { query: parseSearchParams(url.searchParams), path: url.pathname, hash: url.hash || undefined, - origin: url.origin, + origin: hasOrigin ? url.origin : '', }; } catch (error) { return { diff --git a/packages/db/src/buffers/bot-buffer.ts b/packages/db/src/buffers/bot-buffer.ts index 57646468..0d820b36 100644 --- a/packages/db/src/buffers/bot-buffer.ts +++ b/packages/db/src/buffers/bot-buffer.ts @@ -1,39 +1,18 @@ import { TABLE_NAMES, ch } from '../clickhouse-client'; import type { IClickhouseBotEvent } from '../services/event.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, -} from './buffer'; import { RedisBuffer } from './buffer'; -export class BotBuffer extends RedisBuffer { +type BufferType = IClickhouseBotEvent; +export class BotBuffer extends RedisBuffer { constructor() { - super({ - table: TABLE_NAMES.events_bots, - batchSize: 100, - }); + super(TABLE_NAMES.events, 500); } - public onInsert?: OnInsert | undefined; - public onCompleted?: OnCompleted | undefined; - - public processQueue: ProcessQueue = async (queue) => { + protected async insertIntoDB(items: BufferType[]): Promise { await ch.insert({ table: TABLE_NAMES.events_bots, - values: queue.map((item) => item.event), + values: items, format: 'JSONEachRow', }); - return queue.map((item) => item.index); - }; - - public findMany: FindMany = () => { - return Promise.resolve([]); - }; - - public find: Find = () => { - return Promise.resolve(null); - }; + } } diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 55dbc8c0..908c3284 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -1,181 +1,219 @@ +import { v4 as uuidv4 } from 'uuid'; + +import type { ILogger } from '@openpanel/logger'; import { createLogger } from '@openpanel/logger'; import { getRedisCache } from '@openpanel/redis'; -export const DELETE = '__DELETE__'; - -export type QueueItem = { - event: T; - index: number; -}; - -export type OnInsert = (data: T) => unknown; - -export type OnCompleted = - | ((data: T[]) => Promise) - | ((data: T[]) => unknown[]); - -export type ProcessQueue = (data: QueueItem[]) => Promise; - export type Find = ( - callback: (item: QueueItem) => boolean, + callback: (item: T) => boolean, ) => Promise; export type FindMany = ( - callback: (item: QueueItem) => boolean, + callback: (item: T) => boolean, ) => Promise; -const getError = (e: unknown) => { - if (e instanceof Error) { - return [ - `Name: ${e.name}`, - `Message: ${e.message}`, - `Stack: ${e.stack}`, - `Cause: ${e.cause ? String(e.cause) : ''}`, - ].join('\n'); - } - return 'Unknown error'; -}; +export class RedisBuffer { + protected prefix = 'op:buffer'; + protected bufferKey: string; + private lockKey: string; + protected maxBufferSize: number | null; + protected logger: ILogger; -export abstract class RedisBuffer { - // constructor - public prefix = 'op:buffer'; - public table: string; - public batchSize?: number; - public logger: ReturnType; - public disableAutoFlush?: boolean; - - // abstract methods - public abstract onInsert?: OnInsert; - public abstract onCompleted?: OnCompleted; - public abstract processQueue: ProcessQueue; - public abstract find: Find; - public abstract findMany: FindMany; - - constructor(options: { - table: string; - batchSize?: number; - disableAutoFlush?: boolean; - }) { - this.table = options.table; - this.batchSize = options.batchSize; - this.disableAutoFlush = options.disableAutoFlush; + constructor(bufferName: string, maxBufferSize: number | null) { + this.bufferKey = bufferName; + this.lockKey = `lock:${bufferName}`; + this.maxBufferSize = maxBufferSize; this.logger = createLogger({ name: 'buffer' }).child({ - table: this.table, + buffer: bufferName, }); } - public getKey(name?: string) { - const key = `${this.prefix}:${this.table}`; + protected getKey(name?: string) { + const key = `${this.prefix}:${this.bufferKey}`; if (name) { return `${key}:${name}`; } return key; } - public async insert(value: T) { - this.onInsert?.(value); - await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); + async add(item: T): Promise { + try { + this.onAdd(item); + await getRedisCache().rpush(this.getKey(), JSON.stringify(item)); + const bufferSize = await getRedisCache().llen(this.getKey()); - const length = await getRedisCache().llen(this.getKey()); - this.logger.debug( - `Inserted item into buffer ${this.table}. Current length: ${length}`, - ); + this.logger.debug(`Item added. Current size: ${bufferSize}`); - if (!this.disableAutoFlush && this.batchSize && length >= this.batchSize) { - this.logger.info( - `Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`, - ); - this.flush(); + if (this.maxBufferSize && bufferSize >= this.maxBufferSize) { + await this.tryFlush(); + } + } catch (error) { + this.logger.error('Failed to add item to buffer', { error, item }); } } - public async flush() { - try { - const queue = await this.getQueue(this.batchSize || -1); - - if (queue.length === 0) { - this.logger.debug(`Flush called on empty buffer ${this.table}`); - return { count: 0, data: [] }; - } - - this.logger.info( - `Flushing ${queue.length} items from buffer ${this.table}`, - ); + public async tryFlush(): Promise { + const lockId = uuidv4(); + const acquired = await getRedisCache().set( + this.lockKey, + lockId, + 'EX', + 8, + 'NX', + ); + if (acquired === 'OK') { + this.logger.debug('Lock acquired. Attempting to flush.'); try { - const indexes = await this.processQueue(queue); - await this.deleteIndexes(indexes); - const data = indexes - .map((index) => queue[index]?.event) - .filter((event): event is T => event !== null); + await this.flush(); + } finally { + await this.releaseLock(lockId); + } + } else { + this.logger.debug('Failed to acquire lock for. Skipping flush.'); + } + } - if (this.onCompleted) { - const res = await this.onCompleted(data); - this.logger.info( - `Completed processing ${res.length} items from buffer ${this.table}`, - ); - return { count: res.length, data: res }; - } + protected async waitForReleasedLock( + maxWaitTime = 8000, + checkInterval = 500, + ): Promise { + const startTime = performance.now(); - this.logger.info( - `Processed ${indexes.length} items from buffer ${this.table}`, - ); - return { count: indexes.length, data: indexes }; + while (performance.now() - startTime < maxWaitTime) { + const lock = await getRedisCache().get(this.lockKey); + if (!lock) { + return true; + } + + await new Promise((resolve) => setTimeout(resolve, checkInterval)); + } + + this.logger.warn('Timeout waiting for lock release'); + return false; + } + + private async retryOnce(cb: () => Promise) { + try { + await cb(); + } catch (e) { + this.logger.error(`#1 Failed to execute callback: ${cb.name}`, e); + await new Promise((resolve) => setTimeout(resolve, 1000)); + try { + await cb(); } catch (e) { - this.logger.error( - `Failed to process queue while flushing buffer ${this.table}:`, - e, - ); - const timestamp = new Date().getTime(); - await getRedisCache().hset(this.getKey(`failed:${timestamp}`), { - error: getError(e), - data: JSON.stringify(queue.map((item) => item.event)), - retries: 0, - }); - this.logger.warn( - `Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`, + this.logger.error(`#2 Failed to execute callback: ${cb.name}`, e); + } + } + } + + private async flush(): Promise { + // Use a transaction to ensure atomicity + const result = await getRedisCache() + .multi() + .lrange(this.getKey(), 0, -1) + .del(this.getKey()) + .exec(); + + if (!result) { + throw new Error('Redis transaction failed'); + } + + const lrange = result[0]; + + if (!lrange || lrange[0] instanceof Error) { + throw new Error('Redis transaction failed'); + } + + const items = lrange[1] as string[]; + + const parsedItems = items.map((item) => JSON.parse(item) as T); + + if (parsedItems.length === 0) { + this.logger.debug('No items to flush'); + return; + } + + this.logger.info(`Flushing ${parsedItems.length} items`); + + try { + const { toInsert, toKeep } = await this.processItems(parsedItems); + + if (toInsert.length) { + await this.retryOnce(() => this.insertIntoDB(toInsert)); + this.onInsert(toInsert); + } + + // Add back items to keep + if (toKeep.length > 0) { + await getRedisCache().lpush( + this.getKey(), + ...toKeep.map((item) => JSON.stringify(item)), ); } - } catch (e) { - this.logger.error( - `Failed to get queue while flushing buffer ${this.table}:`, - e, + + this.logger.info( + `Inserted ${toInsert.length} items into DB, kept ${toKeep.length} items in buffer`, + { + toInsert: toInsert.length, + toKeep: toKeep.length, + }, ); + } catch (error) { + this.logger.error('Failed to process queue while flushing buffer}:', { + error, + queueSize: items.length, + }); + + if (items.length > 0) { + // Add back items to keep + this.logger.debug('Adding all items back to buffer'); + await getRedisCache().lpush( + this.getKey(), + ...items.map((item) => JSON.stringify(item)), + ); + } } } - public async deleteIndexes(indexes: number[]) { - const multi = getRedisCache().multi(); - indexes.forEach((index) => { - multi.lset(this.getKey(), index, DELETE); - }); - multi.lrem(this.getKey(), 0, DELETE); - await multi.exec(); - this.logger.debug( - `Deleted ${indexes.length} items from buffer ${this.table}`, - ); + private async releaseLock(lockId: string): Promise { + this.logger.debug(`Released lock for ${this.getKey()}`); + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + `; + await getRedisCache().eval(script, 1, this.lockKey, lockId); } - public async getQueue(limit: number): Promise[]> { - const queue = await getRedisCache().lrange(this.getKey(), 0, limit); - const result = queue - .map((item, index) => ({ - event: this.transformQueueItem(item), - index, - })) - .filter((item): item is QueueItem => item.event !== null); - this.logger.debug( - `Retrieved ${result.length} items from buffer ${this.table}`, - ); - return result; + protected async getQueue(count?: number): Promise { + const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1); + return items.map((item) => JSON.parse(item) as T); } - private transformQueueItem(item: string): T | null { - try { - return JSON.parse(item); - } catch (e) { - this.logger.warn(`Failed to parse item in buffer ${this.table}:`, e); - return null; - } + protected processItems(items: T[]): Promise<{ toInsert: T[]; toKeep: T[] }> { + return Promise.resolve({ toInsert: items, toKeep: [] }); } + + protected insertIntoDB(_items: T[]): Promise { + throw new Error('Not implemented'); + } + + protected onAdd(_item: T): void { + // Override in subclass + } + + protected onInsert(_item: T[]): void { + // Override in subclass + } + + public findMany: FindMany = () => { + return Promise.resolve([]); + }; + + public find: Find = () => { + return Promise.resolve(null); + }; } diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 590050b6..1253d5d8 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -10,31 +10,16 @@ import type { IClickhouseEvent, IServiceEvent, } from '../services/event.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, - QueueItem, -} from './buffer'; +import type { Find, FindMany } from './buffer'; import { RedisBuffer } from './buffer'; -const sortOldestFirst = ( - a: QueueItem, - b: QueueItem, -) => - new Date(a.event.created_at).getTime() - - new Date(b.event.created_at).getTime(); - -export class EventBuffer extends RedisBuffer { +type BufferType = IClickhouseEvent; +export class EventBuffer extends RedisBuffer { constructor() { - super({ - table: TABLE_NAMES.events, - }); + super(TABLE_NAMES.events, null); } - public onInsert?: OnInsert | undefined = (event) => { + public onAdd(event: BufferType) { getRedisPub().publish( 'event:received', SuperJSON.stringify(transformEvent(event)), @@ -47,24 +32,22 @@ export class EventBuffer extends RedisBuffer { 60 * 5, ); } - }; + } - public onCompleted?: OnCompleted | undefined = ( - savedEvents, - ) => { - for (const event of savedEvents) { + public onInsert(items: BufferType[]) { + for (const event of items) { getRedisPub().publish( 'event:saved', SuperJSON.stringify(transformEvent(event)), ); } + } - return savedEvents.map((event) => event.id); - }; - - public processQueue: ProcessQueue = async (queue) => { - const itemsToClickhouse = new Set>(); - const itemsToStalled = new Set>(); + protected async processItems( + queue: BufferType[], + ): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> { + const toInsert = new Set(); + const itemsToStalled = new Set(); // Sort data by created_at // oldest first @@ -74,44 +57,37 @@ export class EventBuffer extends RedisBuffer { // We only need screen_views since we want to calculate the duration of each screen // To do this we need a minimum of 2 screen_views queue - .filter( - (item) => - item.event.name !== 'screen_view' || item.event.device === 'server', - ) - .forEach((item) => { + .filter((item) => item.name !== 'screen_view' || item.device === 'server') + .forEach((item, index) => { // Find the last event with data and merge it with the current event // We use profile_id here since this property can be set from backend as well const lastEventWithData = queue - .slice(0, item.index) + .slice(0, index) .findLast((lastEvent) => { return ( - lastEvent.event.project_id === item.event.project_id && - lastEvent.event.profile_id === item.event.profile_id && - lastEvent.event.path !== '' + lastEvent.project_id === item.project_id && + lastEvent.profile_id === item.profile_id && + lastEvent.path !== '' ); }); - const event = deepMergeObjects( - omit(['properties', 'duration'], lastEventWithData?.event || {}), - item.event, + const event = deepMergeObjects( + omit(['properties', 'duration'], lastEventWithData || {}), + item, ); if (lastEventWithData) { - // event.properties.__properties_from = lastEventWithData.event.id; + event.properties.__properties_from = lastEventWithData.id; } - return itemsToClickhouse.add({ - ...item, - event, - }); + return toInsert.add(event); }); // Group screen_view events by session_id const grouped = groupBy( - (item) => item.event.session_id, + (item) => item.session_id, queue.filter( - (item) => - item.event.name === 'screen_view' && item.event.device !== 'server', + (item) => item.name === 'screen_view' && item.device !== 'server', ), ); @@ -123,9 +99,7 @@ export class EventBuffer extends RedisBuffer { // If there is only one screen_view event we can send it back to redis since we can't calculate the duration const hasSessionEnd = queue.find( - (item) => - item.event.name === 'session_end' && - item.event.session_id === sessionId, + (item) => item.name === 'session_end' && item.session_id === sessionId, ); screenViews @@ -136,20 +110,17 @@ export class EventBuffer extends RedisBuffer { // if nextScreenView does not exists we can't calculate the duration (last event in session) if (nextScreenView) { const duration = - new Date(nextScreenView.event.created_at).getTime() - - new Date(item.event.created_at).getTime(); + new Date(nextScreenView.created_at).getTime() - + new Date(item.created_at).getTime(); const event = { - ...item.event, + ...item, duration, }; - event.properties.__duration_from = nextScreenView.event.id; - itemsToClickhouse.add({ - ...item, - event, - }); - // push last event in session if we have a session_end event + event.properties.__duration_from = nextScreenView.id; + toInsert.add(event); } else if (hasSessionEnd) { - itemsToClickhouse.add(item); + // push last event in session if we have a session_end event + toInsert.add(item); } }); } // for of end @@ -158,8 +129,8 @@ export class EventBuffer extends RedisBuffer { // This should not theoretically happen but if it does we should move them to stalled queue.forEach((item) => { if ( - !itemsToClickhouse.has(item) && - new Date(item.event.created_at).getTime() < + !toInsert.has(item) && + new Date(item.created_at).getTime() < new Date().getTime() - 1000 * 60 * 60 * 24 ) { itemsToStalled.add(item); @@ -169,43 +140,57 @@ export class EventBuffer extends RedisBuffer { if (itemsToStalled.size > 0) { const multi = getRedisCache().multi(); for (const item of itemsToStalled) { - multi.rpush(this.getKey('stalled'), JSON.stringify(item.event)); + multi.rpush(this.getKey('stalled'), JSON.stringify(item)); } await multi.exec(); } + const toInsertArray = Array.from(toInsert); + return { + toInsert: toInsertArray, + toKeep: queue.filter( + (item) => !toInsertArray.find((i) => i.id === item.id), + ), + }; + } + + protected async insertIntoDB(items: BufferType[]): Promise { await ch.insert({ table: TABLE_NAMES.events, - values: Array.from(itemsToClickhouse).map((item) => item.event), + values: items, format: 'JSONEachRow', }); - - return [ - ...Array.from(itemsToClickhouse).map((item) => item.index), - ...Array.from(itemsToStalled).map((item) => item.index), - ]; - }; + } public findMany: FindMany = async ( callback, ) => { - return this.getQueue(-1) - .then((queue) => { - return queue.filter(callback).map((item) => transformEvent(item.event)); - }) - .catch(() => { - return []; - }); + if (await this.waitForReleasedLock()) { + return this.getQueue() + .then((queue) => { + return queue.filter(callback).map(transformEvent); + }) + .catch(() => { + return []; + }); + } + return []; }; public find: Find = async (callback) => { - return this.getQueue(-1) - .then((queue) => { - const match = queue.find(callback); - return match ? transformEvent(match.event) : null; - }) - .catch(() => { - return null; - }); + if (await this.waitForReleasedLock()) { + return this.getQueue(-1) + .then((queue) => { + const match = queue.find(callback); + return match ? transformEvent(match) : null; + }) + .catch(() => { + return null; + }); + } + return null; }; } + +const sortOldestFirst = (a: IClickhouseEvent, b: IClickhouseEvent) => + new Date(a.created_at).getTime() - new Date(b.created_at).getTime(); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 208a1f0b..63250871 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,56 +1,80 @@ -import { mergeDeepRight } from 'ramda'; +import { groupBy, mergeDeepRight, prop } from 'ramda'; import { toDots } from '@openpanel/common'; import { getRedisCache } from '@openpanel/redis'; +import { escape } from 'sqlstring'; import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client'; +import { transformProfile } from '../services/profile.service'; import type { IClickhouseProfile, IServiceProfile, } from '../services/profile.service'; -import { transformProfile } from '../services/profile.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, - QueueItem, -} from './buffer'; +import type { Find, FindMany } from './buffer'; import { RedisBuffer } from './buffer'; -export class ProfileBuffer extends RedisBuffer { +const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES + ? Number.parseInt(process.env.BATCH_SIZE_PROFILES, 10) + : 50; + +type BufferType = IClickhouseProfile; +export class ProfileBuffer extends RedisBuffer { constructor() { - super({ - table: TABLE_NAMES.profiles, - batchSize: 100, - disableAutoFlush: true, - }); + super(TABLE_NAMES.profiles, BATCH_SIZE); } - public onInsert?: OnInsert | undefined; - public onCompleted?: OnCompleted | undefined; - - public processQueue: ProcessQueue = async (queue) => { - const cleanedQueue = this.combineQueueItems(queue); - const redisProfiles = await this.getCachedProfiles(cleanedQueue); + // this will do a couple of things: + // - we slice the queue to maxBufferSize since this queries have a limit on character count + // - check redis cache for profiles + // - fetch missing profiles from clickhouse + // - merge the incoming profile with existing data + protected async processItems( + items: BufferType[], + ): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> { + const queue = this.combineQueueItems(items); + const slicedQueue = this.maxBufferSize + ? queue.slice(0, this.maxBufferSize) + : queue; + const redisProfiles = await this.getCachedProfiles(slicedQueue); const dbProfiles = await this.fetchDbProfiles( - cleanedQueue.filter((_, index) => !redisProfiles[index]), + slicedQueue.filter((_, index) => !redisProfiles[index]), ); - const values = this.createProfileValues( - cleanedQueue, + const toInsert = this.createProfileValues( + slicedQueue, redisProfiles, dbProfiles, ); - if (values.length > 0) { - await this.updateRedisCache(values); - await this.insertIntoClickhouse(values); + if (toInsert.length > 0) { + await this.updateRedisCache(toInsert); } - return queue.map((item) => item.index); - }; + return Promise.resolve({ + toInsert, + toKeep: this.maxBufferSize ? queue.slice(this.maxBufferSize) : [], + }); + } + + private combineQueueItems(queue: BufferType[]): BufferType[] { + const itemsToClickhouse = new Map(); + + queue.forEach((item) => { + const key = item.project_id + item.id; + const existing = itemsToClickhouse.get(key); + itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); + }); + + return Array.from(itemsToClickhouse.values()); + } + + protected async insertIntoDB(items: BufferType[]): Promise { + await ch.insert({ + table: TABLE_NAMES.profiles, + values: items, + format: 'JSONEachRow', + }); + } private matchPartialObject( full: any, @@ -77,27 +101,16 @@ export class ProfileBuffer extends RedisBuffer { return true; } - private combineQueueItems( - queue: QueueItem[], - ): QueueItem[] { - const itemsToClickhouse = new Map>(); - - queue.forEach((item) => { - const key = item.event.project_id + item.event.id; - const existing = itemsToClickhouse.get(key); - itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); - }); - - return Array.from(itemsToClickhouse.values()); - } - private async getCachedProfiles( - cleanedQueue: QueueItem[], + queue: BufferType[], ): Promise<(IClickhouseProfile | null)[]> { const redisCache = getRedisCache(); - const keys = cleanedQueue.map( - (item) => `profile:${item.event.project_id}:${item.event.id}`, - ); + const keys = queue.map((item) => `profile:${item.project_id}:${item.id}`); + + if (keys.length === 0) { + return []; + } + const cachedProfiles = await redisCache.mget(...keys); return cachedProfiles.map((profile) => { try { @@ -109,34 +122,51 @@ export class ProfileBuffer extends RedisBuffer { } private async fetchDbProfiles( - cleanedQueue: QueueItem[], + queue: IClickhouseProfile[], ): Promise { - if (cleanedQueue.length === 0) { + if (queue.length === 0) { return []; } + // const grouped = groupBy(prop('project_id'), queue); + // const queries = Object.entries(grouped).map(([project_id, items]) => { + // if (!items) { + // return []; + // } + + // return chQuery( + // `SELECT + // * + // FROM ${TABLE_NAMES.profiles} + // WHERE + // id IN (${items.map((item) => escape(item.id)).join(',')}) + // AND created_at > INTERVAL 12 MONTH + // ORDER BY + // created_at DESC`, + // ); + // }); + return await chQuery( `SELECT * FROM ${TABLE_NAMES.profiles} WHERE - (id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')}) + (project_id, id) IN (${queue.map((item) => `('${item.project_id}', '${item.id}')`).join(',')}) ORDER BY created_at DESC`, ); } private createProfileValues( - cleanedQueue: QueueItem[], + queue: IClickhouseProfile[], redisProfiles: (IClickhouseProfile | null)[], dbProfiles: IClickhouseProfile[], ): IClickhouseProfile[] { - return cleanedQueue + return queue .map((item, index) => { const cachedProfile = redisProfiles[index]; const dbProfile = dbProfiles.find( - (p) => - p.id === item.event.id && p.project_id === item.event.project_id, + (p) => p.id === item.id && p.project_id === item.project_id, ); const profile = cachedProfile || dbProfile; @@ -145,31 +175,33 @@ export class ProfileBuffer extends RedisBuffer { this.matchPartialObject( profile, { - ...item.event, - properties: toDots(item.event.properties), + ...item, + properties: toDots(item.properties), }, { ignore: ['created_at'], }, ) ) { - console.log('Ignoring profile', item.event.id); + this.logger.debug('No changes for profile', { + profile, + }); return null; } return { - id: item.event.id, - first_name: item.event.first_name ?? profile?.first_name ?? '', - last_name: item.event.last_name ?? profile?.last_name ?? '', - email: item.event.email ?? profile?.email ?? '', - avatar: item.event.avatar ?? profile?.avatar ?? '', + id: item.id, + first_name: item.first_name ?? profile?.first_name ?? '', + last_name: item.last_name ?? profile?.last_name ?? '', + email: item.email ?? profile?.email ?? '', + avatar: item.avatar ?? profile?.avatar ?? '', properties: toDots({ ...(profile?.properties ?? {}), - ...(item.event.properties ?? {}), + ...(item.properties ?? {}), }), - project_id: item.event.project_id ?? profile?.project_id ?? '', - created_at: item.event.created_at ?? profile?.created_at ?? '', - is_external: item.event.is_external, + project_id: item.project_id ?? profile?.project_id ?? '', + created_at: item.created_at ?? profile?.created_at ?? '', + is_external: item.is_external, }; }) .flatMap((item) => (item ? [item] : [])); @@ -188,24 +220,12 @@ export class ProfileBuffer extends RedisBuffer { await multi.exec(); } - private async insertIntoClickhouse( - values: IClickhouseProfile[], - ): Promise { - await ch.insert({ - table: TABLE_NAMES.profiles, - values, - format: 'JSONEachRow', - }); - } - public findMany: FindMany = async ( callback, ) => { return this.getQueue(-1) .then((queue) => { - return queue - .filter(callback) - .map((item) => transformProfile(item.event)); + return queue.filter(callback).map(transformProfile); }) .catch(() => { return []; @@ -216,7 +236,7 @@ export class ProfileBuffer extends RedisBuffer { return this.getQueue(-1) .then((queue) => { const match = queue.find(callback); - return match ? transformProfile(match.event) : null; + return match ? transformProfile(match) : null; }) .catch(() => { return null; diff --git a/packages/db/src/buffers/trash/bot-buffer.ts b/packages/db/src/buffers/trash/bot-buffer.ts new file mode 100644 index 00000000..d3028987 --- /dev/null +++ b/packages/db/src/buffers/trash/bot-buffer.ts @@ -0,0 +1,39 @@ +import { TABLE_NAMES, ch } from '../../clickhouse-client'; +import type { IClickhouseBotEvent } from '../../services/event.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +export class BotBuffer extends RedisBuffer { + constructor() { + super({ + table: TABLE_NAMES.events_bots, + batchSize: 100, + }); + } + + public onInsert?: OnInsert | undefined; + public onCompleted?: OnCompleted | undefined; + + public processQueue: ProcessQueue = async (queue) => { + await ch.insert({ + table: TABLE_NAMES.events_bots, + values: queue.map((item) => item.event), + format: 'JSONEachRow', + }); + return queue.map((item) => item.index); + }; + + public findMany: FindMany = () => { + return Promise.resolve([]); + }; + + public find: Find = () => { + return Promise.resolve(null); + }; +} diff --git a/packages/db/src/buffers/trash/buffer.ts b/packages/db/src/buffers/trash/buffer.ts new file mode 100644 index 00000000..55dbc8c0 --- /dev/null +++ b/packages/db/src/buffers/trash/buffer.ts @@ -0,0 +1,181 @@ +import { createLogger } from '@openpanel/logger'; +import { getRedisCache } from '@openpanel/redis'; + +export const DELETE = '__DELETE__'; + +export type QueueItem = { + event: T; + index: number; +}; + +export type OnInsert = (data: T) => unknown; + +export type OnCompleted = + | ((data: T[]) => Promise) + | ((data: T[]) => unknown[]); + +export type ProcessQueue = (data: QueueItem[]) => Promise; + +export type Find = ( + callback: (item: QueueItem) => boolean, +) => Promise; + +export type FindMany = ( + callback: (item: QueueItem) => boolean, +) => Promise; + +const getError = (e: unknown) => { + if (e instanceof Error) { + return [ + `Name: ${e.name}`, + `Message: ${e.message}`, + `Stack: ${e.stack}`, + `Cause: ${e.cause ? String(e.cause) : ''}`, + ].join('\n'); + } + return 'Unknown error'; +}; + +export abstract class RedisBuffer { + // constructor + public prefix = 'op:buffer'; + public table: string; + public batchSize?: number; + public logger: ReturnType; + public disableAutoFlush?: boolean; + + // abstract methods + public abstract onInsert?: OnInsert; + public abstract onCompleted?: OnCompleted; + public abstract processQueue: ProcessQueue; + public abstract find: Find; + public abstract findMany: FindMany; + + constructor(options: { + table: string; + batchSize?: number; + disableAutoFlush?: boolean; + }) { + this.table = options.table; + this.batchSize = options.batchSize; + this.disableAutoFlush = options.disableAutoFlush; + this.logger = createLogger({ name: 'buffer' }).child({ + table: this.table, + }); + } + + public getKey(name?: string) { + const key = `${this.prefix}:${this.table}`; + if (name) { + return `${key}:${name}`; + } + return key; + } + + public async insert(value: T) { + this.onInsert?.(value); + await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); + + const length = await getRedisCache().llen(this.getKey()); + this.logger.debug( + `Inserted item into buffer ${this.table}. Current length: ${length}`, + ); + + if (!this.disableAutoFlush && this.batchSize && length >= this.batchSize) { + this.logger.info( + `Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`, + ); + this.flush(); + } + } + + public async flush() { + try { + const queue = await this.getQueue(this.batchSize || -1); + + if (queue.length === 0) { + this.logger.debug(`Flush called on empty buffer ${this.table}`); + return { count: 0, data: [] }; + } + + this.logger.info( + `Flushing ${queue.length} items from buffer ${this.table}`, + ); + + try { + const indexes = await this.processQueue(queue); + await this.deleteIndexes(indexes); + const data = indexes + .map((index) => queue[index]?.event) + .filter((event): event is T => event !== null); + + if (this.onCompleted) { + const res = await this.onCompleted(data); + this.logger.info( + `Completed processing ${res.length} items from buffer ${this.table}`, + ); + return { count: res.length, data: res }; + } + + this.logger.info( + `Processed ${indexes.length} items from buffer ${this.table}`, + ); + return { count: indexes.length, data: indexes }; + } catch (e) { + this.logger.error( + `Failed to process queue while flushing buffer ${this.table}:`, + e, + ); + const timestamp = new Date().getTime(); + await getRedisCache().hset(this.getKey(`failed:${timestamp}`), { + error: getError(e), + data: JSON.stringify(queue.map((item) => item.event)), + retries: 0, + }); + this.logger.warn( + `Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`, + ); + } + } catch (e) { + this.logger.error( + `Failed to get queue while flushing buffer ${this.table}:`, + e, + ); + } + } + + public async deleteIndexes(indexes: number[]) { + const multi = getRedisCache().multi(); + indexes.forEach((index) => { + multi.lset(this.getKey(), index, DELETE); + }); + multi.lrem(this.getKey(), 0, DELETE); + await multi.exec(); + this.logger.debug( + `Deleted ${indexes.length} items from buffer ${this.table}`, + ); + } + + public async getQueue(limit: number): Promise[]> { + const queue = await getRedisCache().lrange(this.getKey(), 0, limit); + const result = queue + .map((item, index) => ({ + event: this.transformQueueItem(item), + index, + })) + .filter((item): item is QueueItem => item.event !== null); + this.logger.debug( + `Retrieved ${result.length} items from buffer ${this.table}`, + ); + return result; + } + + private transformQueueItem(item: string): T | null { + try { + return JSON.parse(item); + } catch (e) { + this.logger.warn(`Failed to parse item in buffer ${this.table}:`, e); + return null; + } + } +} diff --git a/packages/db/src/buffers/trash/event-buffer.ts b/packages/db/src/buffers/trash/event-buffer.ts new file mode 100644 index 00000000..4616747b --- /dev/null +++ b/packages/db/src/buffers/trash/event-buffer.ts @@ -0,0 +1,211 @@ +import { groupBy, omit } from 'ramda'; +import SuperJSON from 'superjson'; + +import { deepMergeObjects } from '@openpanel/common'; +import { getRedisCache, getRedisPub } from '@openpanel/redis'; + +import { TABLE_NAMES, ch } from '../../clickhouse-client'; +import { transformEvent } from '../../services/event.service'; +import type { + IClickhouseEvent, + IServiceEvent, +} from '../../services/event.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, + QueueItem, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +const sortOldestFirst = ( + a: QueueItem, + b: QueueItem, +) => + new Date(a.event.created_at).getTime() - + new Date(b.event.created_at).getTime(); + +export class EventBuffer extends RedisBuffer { + constructor() { + super({ + table: TABLE_NAMES.events, + }); + } + + public onInsert?: OnInsert | undefined = (event) => { + getRedisPub().publish( + 'event:received', + SuperJSON.stringify(transformEvent(event)), + ); + if (event.profile_id) { + getRedisCache().set( + `live:event:${event.project_id}:${event.profile_id}`, + '', + 'EX', + 60 * 5, + ); + } + }; + + public onCompleted?: OnCompleted | undefined = ( + savedEvents, + ) => { + for (const event of savedEvents) { + getRedisPub().publish( + 'event:saved', + SuperJSON.stringify(transformEvent(event)), + ); + } + + return savedEvents.map((event) => event.id); + }; + + public processQueue: ProcessQueue = async (queue) => { + const itemsToClickhouse = new Set>(); + const itemsToStalled = new Set>(); + + // Sort data by created_at + // oldest first + queue.sort(sortOldestFirst); + + // All events thats not a screen_view can be sent to clickhouse + // We only need screen_views since we want to calculate the duration of each screen + // To do this we need a minimum of 2 screen_views + queue + .filter( + (item) => + item.event.name !== 'screen_view' || item.event.device === 'server', + ) + .forEach((item) => { + // Find the last event with data and merge it with the current event + // We use profile_id here since this property can be set from backend as well + const lastEventWithData = queue + .slice(0, item.index) + .findLast((lastEvent) => { + return ( + lastEvent.event.project_id === item.event.project_id && + lastEvent.event.profile_id === item.event.profile_id && + lastEvent.event.path !== '' + ); + }); + + const event = deepMergeObjects( + omit(['properties', 'duration'], lastEventWithData?.event || {}), + item.event, + ); + + if (lastEventWithData) { + event.properties.__properties_from = lastEventWithData.event.id; + } + + return itemsToClickhouse.add({ + ...item, + event, + }); + }); + + // Group screen_view events by session_id + const grouped = groupBy( + (item) => item.event.session_id, + queue.filter( + (item) => + item.event.name === 'screen_view' && item.event.device !== 'server', + ), + ); + + // Iterate over each group + for (const [sessionId, screenViews] of Object.entries(grouped)) { + if (sessionId === '' || !sessionId) { + continue; + } + + // If there is only one screen_view event we can send it back to redis since we can't calculate the duration + const hasSessionEnd = queue.find( + (item) => + item.event.name === 'session_end' && + item.event.session_id === sessionId, + ); + + screenViews + ?.slice() + .sort(sortOldestFirst) + .forEach((item, index) => { + const nextScreenView = screenViews[index + 1]; + // if nextScreenView does not exists we can't calculate the duration (last event in session) + if (nextScreenView) { + const duration = + new Date(nextScreenView.event.created_at).getTime() - + new Date(item.event.created_at).getTime(); + const event = { + ...item.event, + duration, + }; + event.properties.__duration_from = nextScreenView.event.id; + itemsToClickhouse.add({ + ...item, + event, + }); + // push last event in session if we have a session_end event + } else if (hasSessionEnd) { + itemsToClickhouse.add(item); + } + }); + } // for of end + + // Check if we have any events that has been in the queue for more than 24 hour + // This should not theoretically happen but if it does we should move them to stalled + queue.forEach((item) => { + if ( + !itemsToClickhouse.has(item) && + new Date(item.event.created_at).getTime() < + new Date().getTime() - 1000 * 60 * 60 * 24 + ) { + itemsToStalled.add(item); + } + }); + + if (itemsToStalled.size > 0) { + const multi = getRedisCache().multi(); + for (const item of itemsToStalled) { + multi.rpush(this.getKey('stalled'), JSON.stringify(item.event)); + } + await multi.exec(); + } + + await ch.insert({ + table: TABLE_NAMES.events, + values: Array.from(itemsToClickhouse).map((item) => item.event), + format: 'JSONEachRow', + }); + + return [ + ...Array.from(itemsToClickhouse).map((item) => item.index), + ...Array.from(itemsToStalled).map((item) => item.index), + ]; + }; + + public findMany: FindMany = async ( + callback, + ) => { + return this.getQueue(-1) + .then((queue) => { + return queue.filter(callback).map((item) => transformEvent(item.event)); + }) + .catch(() => { + return []; + }); + }; + + public find: Find = async (callback) => { + return this.getQueue(-1) + .then((queue) => { + const match = queue.find(callback); + return match ? transformEvent(match.event) : null; + }) + .catch(() => { + return null; + }); + }; +} diff --git a/packages/db/src/buffers/trash/profile-buffer.ts b/packages/db/src/buffers/trash/profile-buffer.ts new file mode 100644 index 00000000..a4fe91b9 --- /dev/null +++ b/packages/db/src/buffers/trash/profile-buffer.ts @@ -0,0 +1,225 @@ +import { mergeDeepRight } from 'ramda'; + +import { toDots } from '@openpanel/common'; +import { getRedisCache } from '@openpanel/redis'; + +import { TABLE_NAMES, ch, chQuery } from '../../clickhouse-client'; +import type { + IClickhouseProfile, + IServiceProfile, +} from '../../services/profile.service'; +import { transformProfile } from '../../services/profile.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, + QueueItem, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +export class ProfileBuffer extends RedisBuffer { + constructor() { + super({ + table: TABLE_NAMES.profiles, + batchSize: 100, + disableAutoFlush: true, + }); + } + + public onInsert?: OnInsert | undefined; + public onCompleted?: OnCompleted | undefined; + + public processQueue: ProcessQueue = async (queue) => { + const cleanedQueue = this.combineQueueItems(queue); + const redisProfiles = await this.getCachedProfiles(cleanedQueue); + const dbProfiles = await this.fetchDbProfiles( + cleanedQueue.filter((_, index) => !redisProfiles[index]), + ); + + const values = this.createProfileValues( + cleanedQueue, + redisProfiles, + dbProfiles, + ); + + if (values.length > 0) { + await this.updateRedisCache(values); + await this.insertIntoClickhouse(values); + } + + return queue.map((item) => item.index); + }; + + private matchPartialObject( + full: any, + partial: any, + options: { ignore: string[] }, + ): boolean { + if (typeof partial !== 'object' || partial === null) { + return partial === full; + } + + for (const key in partial) { + if (options.ignore.includes(key)) { + continue; + } + + if ( + !(key in full) || + !this.matchPartialObject(full[key], partial[key], options) + ) { + return false; + } + } + + return true; + } + + private combineQueueItems( + queue: QueueItem[], + ): QueueItem[] { + const itemsToClickhouse = new Map>(); + + queue.forEach((item) => { + const key = item.event.project_id + item.event.id; + const existing = itemsToClickhouse.get(key); + itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); + }); + + return Array.from(itemsToClickhouse.values()); + } + + private async getCachedProfiles( + cleanedQueue: QueueItem[], + ): Promise<(IClickhouseProfile | null)[]> { + const redisCache = getRedisCache(); + const keys = cleanedQueue.map( + (item) => `profile:${item.event.project_id}:${item.event.id}`, + ); + const cachedProfiles = await redisCache.mget(...keys); + return cachedProfiles.map((profile) => { + try { + return profile ? JSON.parse(profile) : null; + } catch (error) { + return null; + } + }); + } + + private async fetchDbProfiles( + cleanedQueue: QueueItem[], + ): Promise { + if (cleanedQueue.length === 0) { + return []; + } + + return await chQuery( + `SELECT + * + FROM ${TABLE_NAMES.profiles} + WHERE + (id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')}) + ORDER BY + created_at DESC`, + ); + } + + private createProfileValues( + cleanedQueue: QueueItem[], + redisProfiles: (IClickhouseProfile | null)[], + dbProfiles: IClickhouseProfile[], + ): IClickhouseProfile[] { + return cleanedQueue + .map((item, index) => { + const cachedProfile = redisProfiles[index]; + const dbProfile = dbProfiles.find( + (p) => + p.id === item.event.id && p.project_id === item.event.project_id, + ); + const profile = cachedProfile || dbProfile; + + if ( + profile && + this.matchPartialObject( + profile, + { + ...item.event, + properties: toDots(item.event.properties), + }, + { + ignore: ['created_at'], + }, + ) + ) { + console.log('Ignoring profile', item.event.id); + return null; + } + + return { + id: item.event.id, + first_name: item.event.first_name ?? profile?.first_name ?? '', + last_name: item.event.last_name ?? profile?.last_name ?? '', + email: item.event.email ?? profile?.email ?? '', + avatar: item.event.avatar ?? profile?.avatar ?? '', + properties: toDots({ + ...(profile?.properties ?? {}), + ...(item.event.properties ?? {}), + }), + project_id: item.event.project_id ?? profile?.project_id ?? '', + created_at: item.event.created_at ?? profile?.created_at ?? '', + is_external: item.event.is_external, + }; + }) + .flatMap((item) => (item ? [item] : [])); + } + + private async updateRedisCache(values: IClickhouseProfile[]): Promise { + const redisCache = getRedisCache(); + const multi = redisCache.multi(); + values.forEach((value) => { + multi.setex( + `profile:${value.project_id}:${value.id}`, + 60 * 30, // 30 minutes + JSON.stringify(value), + ); + }); + await multi.exec(); + } + + private async insertIntoClickhouse( + values: IClickhouseProfile[], + ): Promise { + await ch.insert({ + table: TABLE_NAMES.profiles, + values, + format: 'JSONEachRow', + }); + } + + public findMany: FindMany = async ( + callback, + ) => { + return this.getQueue(-1) + .then((queue) => { + return queue + .filter(callback) + .map((item) => transformProfile(item.event)); + }) + .catch(() => { + return []; + }); + }; + + public find: Find = async (callback) => { + return this.getQueue(-1) + .then((queue) => { + const match = queue.find(callback); + return match ? transformProfile(match.event) : null; + }) + .catch(() => { + return null; + }); + }; +} diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 077bd632..b0992b3f 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -9,7 +9,6 @@ import type { IChartEventFilter } from '@openpanel/validation'; import { botBuffer, eventBuffer } from '../buffers'; import { TABLE_NAMES, - ch, chQuery, convertClickhouseDateToJs, formatClickhouseDate, @@ -339,7 +338,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) { sdk_version: payload.sdkVersion ?? '', }; - await eventBuffer.insert(event); + await eventBuffer.add(event); return { document: event, @@ -562,7 +561,7 @@ export function createBotEvent({ createdAt, path, }: IServiceCreateBotEventPayload) { - return botBuffer.insert({ + return botBuffer.add({ id: uuid(), name, type, @@ -593,7 +592,7 @@ export async function getLastScreenViewFromProfileId({ } const eventInBuffer = await eventBuffer.find( - (item) => item.event.profile_id === profileId, + (item) => item.profile_id === profileId, ); if (eventInBuffer) { diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index 0a7c8d21..8e548fa2 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -222,7 +222,7 @@ export async function upsertProfile({ projectId, isExternal, }: IServiceUpsertProfile) { - return profileBuffer.insert({ + return profileBuffer.add({ id, first_name: firstName!, last_name: lastName!, diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index e24e9776..a6f8a9ad 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -28,12 +28,11 @@ export interface EventsQueuePayloadCreateEvent { type: 'createEvent'; payload: Omit; } +type SessionEndRequired = 'sessionId' | 'deviceId' | 'profileId' | 'projectId'; export interface EventsQueuePayloadCreateSessionEnd { type: 'createSessionEnd'; - payload: Pick< - IServiceEvent, - 'deviceId' | 'sessionId' | 'profileId' | 'projectId' - >; + payload: Partial> & + Pick; } // TODO: Rename `EventsQueuePayloadCreateSessionEnd` @@ -66,6 +65,8 @@ export type CronQueuePayload = | CronQueuePayloadFlushProfiles | CronQueuePayloadPing; +export type CronQueueType = CronQueuePayload['type']; + export const eventsQueue = new Queue('events', { connection: getRedisQueue(), defaultJobOptions: {