From 1242f0e9c9723551b810d076eeafcbaf1c9b94b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 20 Sep 2024 10:35:21 +0200 Subject: [PATCH] chore(buffer): clean up old files --- packages/db/src/buffers/trash/bot-buffer.ts | 39 --- packages/db/src/buffers/trash/buffer.ts | 181 -------------- packages/db/src/buffers/trash/event-buffer.ts | 211 ---------------- .../db/src/buffers/trash/profile-buffer.ts | 225 ------------------ 4 files changed, 656 deletions(-) delete mode 100644 packages/db/src/buffers/trash/bot-buffer.ts delete mode 100644 packages/db/src/buffers/trash/buffer.ts delete mode 100644 packages/db/src/buffers/trash/event-buffer.ts delete mode 100644 packages/db/src/buffers/trash/profile-buffer.ts diff --git a/packages/db/src/buffers/trash/bot-buffer.ts b/packages/db/src/buffers/trash/bot-buffer.ts deleted file mode 100644 index d3028987..00000000 --- a/packages/db/src/buffers/trash/bot-buffer.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { TABLE_NAMES, ch } from '../../clickhouse-client'; -import type { IClickhouseBotEvent } from '../../services/event.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, -} from './buffer'; -import { RedisBuffer } from './buffer'; - -export class BotBuffer extends RedisBuffer { - constructor() { - super({ - table: TABLE_NAMES.events_bots, - batchSize: 100, - }); - } - - public onInsert?: OnInsert | undefined; - public onCompleted?: OnCompleted | undefined; - - public processQueue: ProcessQueue = async (queue) => { - await ch.insert({ - table: TABLE_NAMES.events_bots, - values: queue.map((item) => item.event), - format: 'JSONEachRow', - }); - return queue.map((item) => item.index); - }; - - public findMany: FindMany = () => { - return Promise.resolve([]); - }; - - public find: Find = () => { - return Promise.resolve(null); - }; -} diff --git a/packages/db/src/buffers/trash/buffer.ts b/packages/db/src/buffers/trash/buffer.ts deleted file mode 100644 index 55dbc8c0..00000000 --- a/packages/db/src/buffers/trash/buffer.ts +++ /dev/null @@ -1,181 +0,0 @@ -import { createLogger } from '@openpanel/logger'; -import { getRedisCache } from '@openpanel/redis'; - -export const DELETE = '__DELETE__'; - -export type QueueItem = { - event: T; - index: number; -}; - -export type OnInsert = (data: T) => unknown; - -export type OnCompleted = - | ((data: T[]) => Promise) - | ((data: T[]) => unknown[]); - -export type ProcessQueue = (data: QueueItem[]) => Promise; - -export type Find = ( - callback: (item: QueueItem) => boolean, -) => Promise; - -export type FindMany = ( - callback: (item: QueueItem) => boolean, -) => Promise; - -const getError = (e: unknown) => { - if (e instanceof Error) { - return [ - `Name: ${e.name}`, - `Message: ${e.message}`, - `Stack: ${e.stack}`, - `Cause: ${e.cause ? String(e.cause) : ''}`, - ].join('\n'); - } - return 'Unknown error'; -}; - -export abstract class RedisBuffer { - // constructor - public prefix = 'op:buffer'; - public table: string; - public batchSize?: number; - public logger: ReturnType; - public disableAutoFlush?: boolean; - - // abstract methods - public abstract onInsert?: OnInsert; - public abstract onCompleted?: OnCompleted; - public abstract processQueue: ProcessQueue; - public abstract find: Find; - public abstract findMany: FindMany; - - constructor(options: { - table: string; - batchSize?: number; - disableAutoFlush?: boolean; - }) { - this.table = options.table; - this.batchSize = options.batchSize; - this.disableAutoFlush = options.disableAutoFlush; - this.logger = createLogger({ name: 'buffer' }).child({ - table: this.table, - }); - } - - public getKey(name?: string) { - const key = `${this.prefix}:${this.table}`; - if (name) { - return `${key}:${name}`; - } - return key; - } - - public async insert(value: T) { - this.onInsert?.(value); - await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); - - const length = await getRedisCache().llen(this.getKey()); - this.logger.debug( - `Inserted item into buffer ${this.table}. Current length: ${length}`, - ); - - if (!this.disableAutoFlush && this.batchSize && length >= this.batchSize) { - this.logger.info( - `Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`, - ); - this.flush(); - } - } - - public async flush() { - try { - const queue = await this.getQueue(this.batchSize || -1); - - if (queue.length === 0) { - this.logger.debug(`Flush called on empty buffer ${this.table}`); - return { count: 0, data: [] }; - } - - this.logger.info( - `Flushing ${queue.length} items from buffer ${this.table}`, - ); - - try { - const indexes = await this.processQueue(queue); - await this.deleteIndexes(indexes); - const data = indexes - .map((index) => queue[index]?.event) - .filter((event): event is T => event !== null); - - if (this.onCompleted) { - const res = await this.onCompleted(data); - this.logger.info( - `Completed processing ${res.length} items from buffer ${this.table}`, - ); - return { count: res.length, data: res }; - } - - this.logger.info( - `Processed ${indexes.length} items from buffer ${this.table}`, - ); - return { count: indexes.length, data: indexes }; - } catch (e) { - this.logger.error( - `Failed to process queue while flushing buffer ${this.table}:`, - e, - ); - const timestamp = new Date().getTime(); - await getRedisCache().hset(this.getKey(`failed:${timestamp}`), { - error: getError(e), - data: JSON.stringify(queue.map((item) => item.event)), - retries: 0, - }); - this.logger.warn( - `Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`, - ); - } - } catch (e) { - this.logger.error( - `Failed to get queue while flushing buffer ${this.table}:`, - e, - ); - } - } - - public async deleteIndexes(indexes: number[]) { - const multi = getRedisCache().multi(); - indexes.forEach((index) => { - multi.lset(this.getKey(), index, DELETE); - }); - multi.lrem(this.getKey(), 0, DELETE); - await multi.exec(); - this.logger.debug( - `Deleted ${indexes.length} items from buffer ${this.table}`, - ); - } - - public async getQueue(limit: number): Promise[]> { - const queue = await getRedisCache().lrange(this.getKey(), 0, limit); - const result = queue - .map((item, index) => ({ - event: this.transformQueueItem(item), - index, - })) - .filter((item): item is QueueItem => item.event !== null); - this.logger.debug( - `Retrieved ${result.length} items from buffer ${this.table}`, - ); - return result; - } - - private transformQueueItem(item: string): T | null { - try { - return JSON.parse(item); - } catch (e) { - this.logger.warn(`Failed to parse item in buffer ${this.table}:`, e); - return null; - } - } -} diff --git a/packages/db/src/buffers/trash/event-buffer.ts b/packages/db/src/buffers/trash/event-buffer.ts deleted file mode 100644 index 4616747b..00000000 --- a/packages/db/src/buffers/trash/event-buffer.ts +++ /dev/null @@ -1,211 +0,0 @@ -import { groupBy, omit } from 'ramda'; -import SuperJSON from 'superjson'; - -import { deepMergeObjects } from '@openpanel/common'; -import { getRedisCache, getRedisPub } from '@openpanel/redis'; - -import { TABLE_NAMES, ch } from '../../clickhouse-client'; -import { transformEvent } from '../../services/event.service'; -import type { - IClickhouseEvent, - IServiceEvent, -} from '../../services/event.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, - QueueItem, -} from './buffer'; -import { RedisBuffer } from './buffer'; - -const sortOldestFirst = ( - a: QueueItem, - b: QueueItem, -) => - new Date(a.event.created_at).getTime() - - new Date(b.event.created_at).getTime(); - -export class EventBuffer extends RedisBuffer { - constructor() { - super({ - table: TABLE_NAMES.events, - }); - } - - public onInsert?: OnInsert | undefined = (event) => { - getRedisPub().publish( - 'event:received', - SuperJSON.stringify(transformEvent(event)), - ); - if (event.profile_id) { - getRedisCache().set( - `live:event:${event.project_id}:${event.profile_id}`, - '', - 'EX', - 60 * 5, - ); - } - }; - - public onCompleted?: OnCompleted | undefined = ( - savedEvents, - ) => { - for (const event of savedEvents) { - getRedisPub().publish( - 'event:saved', - SuperJSON.stringify(transformEvent(event)), - ); - } - - return savedEvents.map((event) => event.id); - }; - - public processQueue: ProcessQueue = async (queue) => { - const itemsToClickhouse = new Set>(); - const itemsToStalled = new Set>(); - - // Sort data by created_at - // oldest first - queue.sort(sortOldestFirst); - - // All events thats not a screen_view can be sent to clickhouse - // We only need screen_views since we want to calculate the duration of each screen - // To do this we need a minimum of 2 screen_views - queue - .filter( - (item) => - item.event.name !== 'screen_view' || item.event.device === 'server', - ) - .forEach((item) => { - // Find the last event with data and merge it with the current event - // We use profile_id here since this property can be set from backend as well - const lastEventWithData = queue - .slice(0, item.index) - .findLast((lastEvent) => { - return ( - lastEvent.event.project_id === item.event.project_id && - lastEvent.event.profile_id === item.event.profile_id && - lastEvent.event.path !== '' - ); - }); - - const event = deepMergeObjects( - omit(['properties', 'duration'], lastEventWithData?.event || {}), - item.event, - ); - - if (lastEventWithData) { - event.properties.__properties_from = lastEventWithData.event.id; - } - - return itemsToClickhouse.add({ - ...item, - event, - }); - }); - - // Group screen_view events by session_id - const grouped = groupBy( - (item) => item.event.session_id, - queue.filter( - (item) => - item.event.name === 'screen_view' && item.event.device !== 'server', - ), - ); - - // Iterate over each group - for (const [sessionId, screenViews] of Object.entries(grouped)) { - if (sessionId === '' || !sessionId) { - continue; - } - - // If there is only one screen_view event we can send it back to redis since we can't calculate the duration - const hasSessionEnd = queue.find( - (item) => - item.event.name === 'session_end' && - item.event.session_id === sessionId, - ); - - screenViews - ?.slice() - .sort(sortOldestFirst) - .forEach((item, index) => { - const nextScreenView = screenViews[index + 1]; - // if nextScreenView does not exists we can't calculate the duration (last event in session) - if (nextScreenView) { - const duration = - new Date(nextScreenView.event.created_at).getTime() - - new Date(item.event.created_at).getTime(); - const event = { - ...item.event, - duration, - }; - event.properties.__duration_from = nextScreenView.event.id; - itemsToClickhouse.add({ - ...item, - event, - }); - // push last event in session if we have a session_end event - } else if (hasSessionEnd) { - itemsToClickhouse.add(item); - } - }); - } // for of end - - // Check if we have any events that has been in the queue for more than 24 hour - // This should not theoretically happen but if it does we should move them to stalled - queue.forEach((item) => { - if ( - !itemsToClickhouse.has(item) && - new Date(item.event.created_at).getTime() < - new Date().getTime() - 1000 * 60 * 60 * 24 - ) { - itemsToStalled.add(item); - } - }); - - if (itemsToStalled.size > 0) { - const multi = getRedisCache().multi(); - for (const item of itemsToStalled) { - multi.rpush(this.getKey('stalled'), JSON.stringify(item.event)); - } - await multi.exec(); - } - - await ch.insert({ - table: TABLE_NAMES.events, - values: Array.from(itemsToClickhouse).map((item) => item.event), - format: 'JSONEachRow', - }); - - return [ - ...Array.from(itemsToClickhouse).map((item) => item.index), - ...Array.from(itemsToStalled).map((item) => item.index), - ]; - }; - - public findMany: FindMany = async ( - callback, - ) => { - return this.getQueue(-1) - .then((queue) => { - return queue.filter(callback).map((item) => transformEvent(item.event)); - }) - .catch(() => { - return []; - }); - }; - - public find: Find = async (callback) => { - return this.getQueue(-1) - .then((queue) => { - const match = queue.find(callback); - return match ? transformEvent(match.event) : null; - }) - .catch(() => { - return null; - }); - }; -} diff --git a/packages/db/src/buffers/trash/profile-buffer.ts b/packages/db/src/buffers/trash/profile-buffer.ts deleted file mode 100644 index a4fe91b9..00000000 --- a/packages/db/src/buffers/trash/profile-buffer.ts +++ /dev/null @@ -1,225 +0,0 @@ -import { mergeDeepRight } from 'ramda'; - -import { toDots } from '@openpanel/common'; -import { getRedisCache } from '@openpanel/redis'; - -import { TABLE_NAMES, ch, chQuery } from '../../clickhouse-client'; -import type { - IClickhouseProfile, - IServiceProfile, -} from '../../services/profile.service'; -import { transformProfile } from '../../services/profile.service'; -import type { - Find, - FindMany, - OnCompleted, - OnInsert, - ProcessQueue, - QueueItem, -} from './buffer'; -import { RedisBuffer } from './buffer'; - -export class ProfileBuffer extends RedisBuffer { - constructor() { - super({ - table: TABLE_NAMES.profiles, - batchSize: 100, - disableAutoFlush: true, - }); - } - - public onInsert?: OnInsert | undefined; - public onCompleted?: OnCompleted | undefined; - - public processQueue: ProcessQueue = async (queue) => { - const cleanedQueue = this.combineQueueItems(queue); - const redisProfiles = await this.getCachedProfiles(cleanedQueue); - const dbProfiles = await this.fetchDbProfiles( - cleanedQueue.filter((_, index) => !redisProfiles[index]), - ); - - const values = this.createProfileValues( - cleanedQueue, - redisProfiles, - dbProfiles, - ); - - if (values.length > 0) { - await this.updateRedisCache(values); - await this.insertIntoClickhouse(values); - } - - return queue.map((item) => item.index); - }; - - private matchPartialObject( - full: any, - partial: any, - options: { ignore: string[] }, - ): boolean { - if (typeof partial !== 'object' || partial === null) { - return partial === full; - } - - for (const key in partial) { - if (options.ignore.includes(key)) { - continue; - } - - if ( - !(key in full) || - !this.matchPartialObject(full[key], partial[key], options) - ) { - return false; - } - } - - return true; - } - - private combineQueueItems( - queue: QueueItem[], - ): QueueItem[] { - const itemsToClickhouse = new Map>(); - - queue.forEach((item) => { - const key = item.event.project_id + item.event.id; - const existing = itemsToClickhouse.get(key); - itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); - }); - - return Array.from(itemsToClickhouse.values()); - } - - private async getCachedProfiles( - cleanedQueue: QueueItem[], - ): Promise<(IClickhouseProfile | null)[]> { - const redisCache = getRedisCache(); - const keys = cleanedQueue.map( - (item) => `profile:${item.event.project_id}:${item.event.id}`, - ); - const cachedProfiles = await redisCache.mget(...keys); - return cachedProfiles.map((profile) => { - try { - return profile ? JSON.parse(profile) : null; - } catch (error) { - return null; - } - }); - } - - private async fetchDbProfiles( - cleanedQueue: QueueItem[], - ): Promise { - if (cleanedQueue.length === 0) { - return []; - } - - return await chQuery( - `SELECT - * - FROM ${TABLE_NAMES.profiles} - WHERE - (id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')}) - ORDER BY - created_at DESC`, - ); - } - - private createProfileValues( - cleanedQueue: QueueItem[], - redisProfiles: (IClickhouseProfile | null)[], - dbProfiles: IClickhouseProfile[], - ): IClickhouseProfile[] { - return cleanedQueue - .map((item, index) => { - const cachedProfile = redisProfiles[index]; - const dbProfile = dbProfiles.find( - (p) => - p.id === item.event.id && p.project_id === item.event.project_id, - ); - const profile = cachedProfile || dbProfile; - - if ( - profile && - this.matchPartialObject( - profile, - { - ...item.event, - properties: toDots(item.event.properties), - }, - { - ignore: ['created_at'], - }, - ) - ) { - console.log('Ignoring profile', item.event.id); - return null; - } - - return { - id: item.event.id, - first_name: item.event.first_name ?? profile?.first_name ?? '', - last_name: item.event.last_name ?? profile?.last_name ?? '', - email: item.event.email ?? profile?.email ?? '', - avatar: item.event.avatar ?? profile?.avatar ?? '', - properties: toDots({ - ...(profile?.properties ?? {}), - ...(item.event.properties ?? {}), - }), - project_id: item.event.project_id ?? profile?.project_id ?? '', - created_at: item.event.created_at ?? profile?.created_at ?? '', - is_external: item.event.is_external, - }; - }) - .flatMap((item) => (item ? [item] : [])); - } - - private async updateRedisCache(values: IClickhouseProfile[]): Promise { - const redisCache = getRedisCache(); - const multi = redisCache.multi(); - values.forEach((value) => { - multi.setex( - `profile:${value.project_id}:${value.id}`, - 60 * 30, // 30 minutes - JSON.stringify(value), - ); - }); - await multi.exec(); - } - - private async insertIntoClickhouse( - values: IClickhouseProfile[], - ): Promise { - await ch.insert({ - table: TABLE_NAMES.profiles, - values, - format: 'JSONEachRow', - }); - } - - public findMany: FindMany = async ( - callback, - ) => { - return this.getQueue(-1) - .then((queue) => { - return queue - .filter(callback) - .map((item) => transformProfile(item.event)); - }) - .catch(() => { - return []; - }); - }; - - public find: Find = async (callback) => { - return this.getQueue(-1) - .then((queue) => { - const match = queue.find(callback); - return match ? transformProfile(match.event) : null; - }) - .catch(() => { - return null; - }); - }; -}