diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index 9989c5c4..4497bebf 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -71,11 +71,7 @@ register.registerMetric( name: `buffer_${eventBuffer.name}_count`, help: 'Number of unprocessed events', async collect() { - const metric = await db.eventBuffer.count({ - where: { - processedAt: null, - }, - }); + const metric = await eventBuffer.getBufferSize(); this.set(metric); }, }), @@ -86,11 +82,7 @@ register.registerMetric( name: `buffer_${profileBuffer.name}_count`, help: 'Number of unprocessed profiles', async collect() { - const metric = await db.profileBuffer.count({ - where: { - processedAt: null, - }, - }); + const metric = await profileBuffer.getBufferSize(); this.set(metric); }, }), @@ -101,11 +93,7 @@ register.registerMetric( name: `buffer_${botBuffer.name}_count`, help: 'Number of unprocessed bot events', async collect() { - const metric = await db.botEventBuffer.count({ - where: { - processedAt: null, - }, - }); + const metric = await botBuffer.getBufferSize(); this.set(metric); }, }), diff --git a/packages/db/src/buffers/bot-buffer-psql.ts b/packages/db/src/buffers/bot-buffer-psql.ts index db0d9416..40a5e135 100644 --- a/packages/db/src/buffers/bot-buffer-psql.ts +++ b/packages/db/src/buffers/bot-buffer-psql.ts @@ -105,4 +105,12 @@ export class BotBuffer extends BaseBuffer { this.logger.info('Cleaned up old bot events', { deleted: deleted.count }); } + + public async getBufferSize() { + return db.botEventBuffer.count({ + where: { + processedAt: null, + }, + }); + } } diff --git a/packages/db/src/buffers/bot-buffer-redis.ts b/packages/db/src/buffers/bot-buffer-redis.ts new file mode 100644 index 00000000..fa8edd70 --- /dev/null +++ b/packages/db/src/buffers/bot-buffer-redis.ts @@ -0,0 +1,77 @@ +import { type Redis, getRedisCache, runEvery } from '@openpanel/redis'; + +import { getSafeJson } from '@openpanel/common'; +import { TABLE_NAMES, ch } from '../clickhouse-client'; +import type { IClickhouseBotEvent } from '../services/event.service'; +import { BaseBuffer } from './base-buffer'; + +export class BotBuffer extends BaseBuffer { + private batchSize = process.env.BOT_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.BOT_BUFFER_BATCH_SIZE, 10) + : 1000; + + private readonly redisKey = 'bot-events-buffer'; + private redis: Redis; + constructor() { + super({ + name: 'bot', + onFlush: async () => { + await this.processBuffer(); + }, + }); + this.redis = getRedisCache(); + } + + async add(event: IClickhouseBotEvent) { + try { + // Add event to Redis list + await this.redis.rpush(this.redisKey, JSON.stringify(event)); + + // Check buffer length + const bufferLength = await this.redis.llen(this.redisKey); + + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } + } catch (error) { + this.logger.error('Failed to add bot event', { error }); + } + } + + async processBuffer() { + try { + // Get events from the start without removing them + const events = await this.redis.lrange( + this.redisKey, + 0, + this.batchSize - 1, + ); + + if (events.length === 0) return; + + const parsedEvents = events.map((e) => + getSafeJson(e), + ); + + // Insert to ClickHouse + await ch.insert({ + table: TABLE_NAMES.events_bots, + values: parsedEvents, + format: 'JSONEachRow', + }); + + // Only remove events after successful insert + await this.redis.ltrim(this.redisKey, events.length, -1); + + this.logger.info('Processed bot events', { + count: events.length, + }); + } catch (error) { + this.logger.error('Failed to process buffer', { error }); + } + } + + async getBufferSize() { + return getRedisCache().llen(this.redisKey); + } +} diff --git a/packages/db/src/buffers/bot-buffer.ts b/packages/db/src/buffers/bot-buffer.ts deleted file mode 100644 index a96e421c..00000000 --- a/packages/db/src/buffers/bot-buffer.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { TABLE_NAMES, ch } from '../clickhouse-client'; -import type { IClickhouseBotEvent } from '../services/event.service'; -import { BotBuffer as NewBotBuffer } from './bot-buffer-psql'; -import { RedisBuffer } from './buffer'; - -const testNewBotBuffer = new NewBotBuffer(); - -type BufferType = IClickhouseBotEvent; -export class BotBuffer extends RedisBuffer { - constructor() { - super('events_bots', 500); - } - - async add(event: BufferType) { - await super.add(event); - if (process.env.TEST_NEW_BUFFER) { - await testNewBotBuffer.add(event); - } - } - - protected async insertIntoDB(items: BufferType[]): Promise { - await ch.insert({ - table: TABLE_NAMES.events_bots, - values: items, - format: 'JSONEachRow', - }); - } -} diff --git a/packages/db/src/buffers/event-buffer-psql.ts b/packages/db/src/buffers/event-buffer-psql.ts index 45bca8d0..a1b9f69b 100644 --- a/packages/db/src/buffers/event-buffer-psql.ts +++ b/packages/db/src/buffers/event-buffer-psql.ts @@ -296,4 +296,12 @@ export class EventBuffer extends BaseBuffer { }) { return `session:last_screen_view:${projectId}:${profileId}`; } + + async getBufferSize() { + return db.eventBuffer.count({ + where: { + processedAt: null, + }, + }); + } } diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts new file mode 100644 index 00000000..1bbfc168 --- /dev/null +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -0,0 +1,674 @@ +import { getSafeJson, setSuperJson } from '@openpanel/common'; +import { + type Redis, + getRedisCache, + getRedisPub, + runEvery, +} from '@openpanel/redis'; +import { ch } from '../clickhouse-client'; +import { + type IClickhouseEvent, + type IServiceEvent, + transformEvent, +} from '../services/event.service'; +import { BaseBuffer } from './base-buffer'; + +/** + * + * Usuful redis commands: + * --------------------- + * + * Add empty session + * ZADD event_buffer:sessions_sorted 1710831600000 "test_empty_session" + * + * Get session events + * LRANGE event_buffer:session:test_empty_session 0 -1 + * + * Get session events count + * LLEN event_buffer:session:test_empty_session + * + * Get regular queue events + * LRANGE event_buffer:regular_queue 0 -1 + * + * Get regular queue count + * LLEN event_buffer:regular_queue + * + */ + +export class EventBuffer extends BaseBuffer { + // Configurable limits + private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP + ? Number.parseFloat(process.env.EVENT_BUFFER_DAYS_TO_KEEP) + : 3; + private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) + : 4000; + private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) + : 1000; + private updatePendingSessionsBatchSize = process.env + .EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE + ? Number.parseInt( + process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE, + 10, + ) + : 1000; + + private sessionEvents = ['screen_view', 'session_end']; + + // LIST - Stores events without sessions + private regularQueueKey = 'event_buffer:regular_queue'; + + // SORTED SET - Tracks all active session IDs with their timestamps + private sessionSortedKey = 'event_buffer:sessions_sorted'; // sorted set of session IDs + + private readonly sessionKeyPrefix = 'event_buffer:session:'; + // LIST - Stores events for a given session + private getSessionKey(sessionId: string) { + return `${this.sessionKeyPrefix}${sessionId}`; + } + /** + * Lua script that loops through sessions and returns a JSON-encoded list of + * session objects (sessionId and events). It stops once a total number of events + * >= batchSize is reached. It also cleans up any empty sessions. + */ + private readonly processSessionsScript = ` +local sessionSortedKey = KEYS[1] +local sessionPrefix = KEYS[2] +local batchSize = tonumber(ARGV[1]) +local minEvents = tonumber(ARGV[2]) -- New parameter for minimum events + +local result = {} +local sessionsToRemove = {} +local sessionIds = redis.call('ZRANGE', sessionSortedKey, 0, -1) +local resultIndex = 1 +local totalEvents = 0 + +for i, sessionId in ipairs(sessionIds) do + local sessionKey = sessionPrefix .. sessionId + local events = redis.call('LRANGE', sessionKey, 0, -1) + + if #events == 0 then + table.insert(sessionsToRemove, sessionId) + elseif #events >= minEvents then -- Use the parameter instead of hardcoded value + result[resultIndex] = { sessionId = sessionId, events = events } + resultIndex = resultIndex + 1 + totalEvents = totalEvents + #events + if totalEvents >= batchSize then + break + end + end +end + +if #sessionsToRemove > 0 then + redis.call('ZREM', sessionSortedKey, unpack(sessionsToRemove)) +end + +return cjson.encode(result) +`; + + /** + * New atomic Lua script to update a session's list with pending events. + * Instead of doing a separate DEL and RPUSH (which leaves a race condition), + * this script will: + * 1. Remove the first `snapshotCount` items from the session list. + * 2. Re-insert the pending events (provided as additional arguments) + * at the head (using LPUSH in reverse order to preserve order). + * + * KEYS[1] = session key + * ARGV[1] = snapshotCount (number of events that were present in our snapshot) + * ARGV[2] = pendingCount (number of pending events) + * ARGV[3..(2+pendingCount)] = the pending event strings + */ + private readonly updateSessionScript = ` +local snapshotCount = tonumber(ARGV[1]) +local pendingCount = tonumber(ARGV[2]) +local sessionKey = KEYS[1] + +-- Trim the list to remove the processed (snapshot) events. +redis.call("LTRIM", sessionKey, snapshotCount, -1) + +-- Re-insert the pending events at the head in their original order. +for i = pendingCount, 1, -1 do + redis.call("LPUSH", sessionKey, ARGV[i+2]) +end + +return redis.call("LLEN", sessionKey) +`; + + /** + * Lua script that processes a batch of session updates in a single call. + * Format of updates: [sessionKey1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] + */ + private readonly batchUpdateSessionsScript = ` +local i = 1 +while i <= #ARGV do + local sessionKey = ARGV[i] + local snapshotCount = tonumber(ARGV[i + 1]) + local pendingCount = tonumber(ARGV[i + 2]) + + -- Trim the list to remove processed events + redis.call("LTRIM", sessionKey, snapshotCount, -1) + + -- Re-insert pending events at the head in original order + if pendingCount > 0 then + local pendingEvents = {} + for j = 1, pendingCount do + table.insert(pendingEvents, ARGV[i + 2 + j]) + end + redis.call("LPUSH", sessionKey, unpack(pendingEvents)) + end + + i = i + 3 + pendingCount +end +return "OK" +`; + + constructor() { + super({ + name: 'event', + onFlush: async () => { + await this.processBuffer(); + await this.tryCleanup(); + }, + }); + } + + bulkAdd(events: IClickhouseEvent[]) { + const redis = getRedisCache(); + const multi = redis.multi(); + for (const event of events) { + this.add(event, multi); + } + return multi.exec(); + } + + /** + * Add an event into Redis. + * Combines multiple Redis operations into a single MULTI command. + */ + async add(event: IClickhouseEvent, _multi?: ReturnType) { + try { + const redis = getRedisCache(); + const eventJson = JSON.stringify(event); + const multi = _multi || redis.multi(); + + if (event.session_id && this.sessionEvents.includes(event.name)) { + if (event.name === 'screen_view') { + multi.set( + this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }), + eventJson, + 'EX', + 60 * 31, + ); + } else if (event.name === 'session_end') { + multi.del( + this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }), + ); + } + + const sessionKey = this.getSessionKey(event.session_id); + const score = new Date(event.created_at || Date.now()).getTime(); + + multi + .rpush(sessionKey, eventJson) + .zadd(this.sessionSortedKey, 'NX', score, event.session_id); + } else { + // All other events go to regularQueue queue + multi.rpush(this.regularQueueKey, eventJson); + } + + if (event.profile_id) { + multi.set( + `live:event:${event.project_id}:${event.profile_id}`, + '', + 'EX', + 60 * 5, + ); + } + + if (!_multi) { + await multi.exec(); + } + await this.publishEvent('event:received', event); + } catch (error) { + this.logger.error('Failed to add event to Redis buffer', { error }); + } + } + + private async publishEvent( + channel: string, + event: IClickhouseEvent, + multi?: ReturnType, + ) { + try { + await (multi || getRedisPub()).publish( + channel, + setSuperJson( + transformEvent(event) as unknown as Record, + ), + ); + } catch (error) { + this.logger.warn('Failed to publish event', { error }); + } + } + + private async getEligableSessions({ minEventsInSession = 2 }) { + const sessionsSorted = await getRedisCache().eval( + this.processSessionsScript, + 2, // number of KEYS + this.sessionSortedKey, + this.sessionKeyPrefix, + (this.batchSize / 2).toString(), + minEventsInSession.toString(), + ); + + // (A) Process session events using the Lua script. + const parsed = getSafeJson< + Array<{ + sessionId: string; + events: string[]; + }> + >(sessionsSorted as string); + + const sessions: Record = {}; + if (!parsed) { + return sessions; + } + + for (const session of parsed) { + // Might be redundant check + if (session.events.length > 1) { + sessions[session.sessionId] = session.events + .map((e) => getSafeJson(e)) + .filter((e): e is IClickhouseEvent => e !== null); + } + } + + return sessions; + } + + /** + * Process the Redis buffer. + * + * 1. Fetch events from two sources in parallel: + * - Pick events from regular queue (batchSize / 2) + * - Pick events from sessions (batchSize / 2). + * This only have screen_view and session_end events + * + * 2. Process session events: + * - For screen_view events, calculate duration if next event exists + * - Last screen_view of each session remains pending + * - All other events are marked for flushing + * + * 3. Process regular queue events: + * - Inherit path/origin from last screen_view of same session if exists + * + * 4. Insert all flushable events into ClickHouse in chunks and publish notifications + * + * 5. Clean up processed events: + * - For regular queue: LTRIM processed events + * - For sessions: Update lists atomically via Lua script, preserving pending events + */ + async processBuffer() { + const redis = getRedisCache(); + const eventsToClickhouse: IClickhouseEvent[] = []; + const pendingUpdates: Array<{ + sessionId: string; + snapshotCount: number; + pending: IClickhouseEvent[]; + }> = []; + const timer = { + fetchUnprocessedEvents: 0, + processSessionEvents: 0, + processRegularQueueEvents: 0, + insertEvents: 0, + updatePendingSessions: 0, + }; + + try { + let now = performance.now(); + const [sessions, regularQueueEvents] = await Promise.all([ + // (A) Fetch session events + this.getEligableSessions({ minEventsInSession: 2 }), + // (B) Fetch no-session events + redis.lrange(this.regularQueueKey, 0, this.batchSize / 2 - 1), + ]); + + timer.fetchUnprocessedEvents = performance.now() - now; + now = performance.now(); + + for (const [sessionId, sessionEvents] of Object.entries(sessions)) { + const { flush, pending } = this.processSessionEvents(sessionEvents); + + if (flush.length > 0) { + eventsToClickhouse.push(...flush); + } + + pendingUpdates.push({ + sessionId, + snapshotCount: sessionEvents.length, + pending, + }); + } + + timer.processSessionEvents = performance.now() - now; + now = performance.now(); + + // (C) Sort events by creation time. + eventsToClickhouse.sort( + (a, b) => + new Date(a.created_at || 0).getTime() - + new Date(b.created_at || 0).getTime(), + ); + + // (B) Process no-session events + for (const eventStr of regularQueueEvents) { + const event = getSafeJson(eventStr); + if (event) { + const sessionEvents = sessions[event.session_id] || []; + const screenView = sessionEvents.findLast((sessionEvent) => { + const isScreenView = sessionEvent.name === 'screen_view'; + const isBeforeEvent = + new Date(sessionEvent.created_at).getTime() < + new Date(event.created_at).getTime(); + + return isScreenView && isBeforeEvent; + }); + + if (screenView) { + event.path = screenView.path; + event.origin = screenView.origin; + event.properties.__inherit_from = screenView.id; + } + + eventsToClickhouse.push(event); + } + } + + timer.processRegularQueueEvents = performance.now() - now; + now = performance.now(); + + if (eventsToClickhouse.length === 0) { + this.logger.debug('No events to process'); + return; + } + + // (C) Sort events by creation time. + eventsToClickhouse.sort( + (a, b) => + new Date(a.created_at || 0).getTime() - + new Date(b.created_at || 0).getTime(), + ); + + // (D) Insert events into ClickHouse in chunks + this.logger.info('Inserting events into ClickHouse', { + totalEvents: eventsToClickhouse.length, + chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), + }); + + for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) { + await ch.insert({ + table: 'events', + values: chunk, + format: 'JSONEachRow', + }); + } + + timer.insertEvents = performance.now() - now; + now = performance.now(); + + // (E) Publish "saved" events. + const pubMulti = getRedisPub().multi(); + for (const event of eventsToClickhouse) { + await this.publishEvent('event:saved', event, pubMulti); + } + await pubMulti.exec(); + + // (F) Only after successful processing, update Redis + const multi = redis.multi(); + + // Clean up no-session events + if (regularQueueEvents.length > 0) { + multi.ltrim(this.regularQueueKey, regularQueueEvents.length, -1); + } + + await multi.exec(); + + // Process pending sessions in batches + await this.processPendingSessionsInBatches(redis, pendingUpdates); + + timer.updatePendingSessions = performance.now() - now; + + this.logger.info('Processed events from Redis buffer', { + batchSize: this.batchSize, + eventsToClickhouse: eventsToClickhouse.length, + pendingSessionUpdates: pendingUpdates.length, + sessionEvents: Object.entries(sessions).reduce( + (acc, [sId, events]) => acc + events.length, + 0, + ), + regularEvents: regularQueueEvents.length, + timer, + }); + } catch (error) { + this.logger.error('Error processing Redis buffer', { error }); + } + } + + /** + * Process a session's events. + * + * For each event in the session (in order): + * - If it is a screen_view, look for a subsequent event (screen_view or session_end) + * to calculate its duration. If found, flush it; if not, leave it pending. + * + * Returns an object with two arrays: + * flush: events to be sent to ClickHouse. + * pending: events that remain in the Redis session list. + */ + private processSessionEvents(events: IClickhouseEvent[]): { + flush: IClickhouseEvent[]; + pending: IClickhouseEvent[]; + } { + // Ensure events are sorted by created_at + events.sort( + (a, b) => + new Date(a.created_at || 0).getTime() - + new Date(b.created_at || 0).getTime(), + ); + + const flush: IClickhouseEvent[] = []; + const pending: IClickhouseEvent[] = []; + + for (let i = 0; i < events.length; i++) { + const event = events[i]!; + + if (event.name === 'session_end') { + flush.push(event); + } else { + // For screen_view events, look for next event + const next = events[i + 1]; + if (next) { + if (next.name === 'screen_view') { + event.duration = + new Date(next.created_at).getTime() - + new Date(event.created_at).getTime(); + } + flush.push(event); + } else { + pending.push(event); + } + } + } + + return { flush, pending }; + } + + async tryCleanup() { + try { + await runEvery({ + interval: 60 * 60 * 24, + fn: this.cleanup.bind(this), + key: `${this.name}-cleanup`, + }); + } catch (error) { + this.logger.error('Failed to run cleanup', { error }); + } + } + + /** + * Cleanup old events from Redis. + * For each key (no-session and per-session), remove events older than the cutoff date. + */ + async cleanup() { + const redis = getRedisCache(); + const cutoffTime = Date.now() - 1000 * 60 * 60 * 24 * this.daysToKeep; + + try { + const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); + + for (const sessionId of sessionIds) { + const score = await redis.zscore(this.sessionSortedKey, sessionId); + + if (score) { + const scoreInt = Number.parseInt(score, 10); + if (scoreInt < cutoffTime) { + this.logger.warn('Stale session found', { + sessionId, + score, + createdAt: new Date(Number.parseInt(score, 10)), + eventsCount: await redis.llen(this.getSessionKey(sessionId)), + }); + } + } + } + } catch (error) { + this.logger.error('Failed to cleanup stale sessions', { error }); + } + } + + /** + * Retrieve the latest screen_view event for a given project/profile. + */ + public async getLastScreenView({ + projectId, + profileId, + }: { + projectId: string; + profileId: string; + }): Promise { + const redis = getRedisCache(); + const eventStr = await redis.get( + this.getLastEventKey({ projectId, profileId }), + ); + if (eventStr) { + const parsed = getSafeJson(eventStr); + if (parsed) { + return transformEvent(parsed); + } + } + return null; + } + + private getLastEventKey({ + projectId, + profileId, + }: { + projectId: string; + profileId: string; + }) { + return `session:last_screen_view:${projectId}:${profileId}`; + } + + private async processPendingSessionsInBatches( + redis: ReturnType, + pendingUpdates: Array<{ + sessionId: string; + snapshotCount: number; + pending: IClickhouseEvent[]; + }>, + ) { + for (const batch of this.chunks( + pendingUpdates, + this.updatePendingSessionsBatchSize, + )) { + const batchArgs: string[] = []; + + for (const { sessionId, snapshotCount, pending } of batch) { + const sessionKey = this.getSessionKey(sessionId); + batchArgs.push( + sessionKey, + snapshotCount.toString(), + pending.length.toString(), + ...pending.map((e) => JSON.stringify(e)), + ); + } + + await redis.eval( + this.batchUpdateSessionsScript, + 0, // no KEYS needed + ...batchArgs, + ); + } + } + + public async getBufferSizeHeavy() { + const redis = getRedisCache(); + const pipeline = redis.pipeline(); + + // Queue up commands in the pipeline + pipeline.llen(this.regularQueueKey); + pipeline.zcard(this.sessionSortedKey); + + // Execute pipeline to get initial counts + const [regularQueueCount, sessionCount] = (await pipeline.exec()) as [ + any, + any, + ]; + + if (sessionCount[1] === 0) { + return regularQueueCount[1]; + } + + // Get all session IDs and queue up LLEN commands for each session + const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); + const sessionPipeline = redis.pipeline(); + + for (const sessionId of sessionIds) { + sessionPipeline.llen(this.getSessionKey(sessionId)); + } + + // Execute all LLEN commands in a single pipeline + const sessionCounts = (await sessionPipeline.exec()) as [any, any][]; + + // Sum up all counts + const totalSessionEvents = sessionCounts.reduce((sum, [err, count]) => { + if (err) return sum; + return sum + count; + }, 0); + + return regularQueueCount[1] + totalSessionEvents; + } + + public async getBufferSize() { + const cached = await getRedisCache().get('event_buffer:cached_count'); + if (cached) { + return Number.parseInt(cached, 10); + } + const count = await this.getBufferSizeHeavy(); + await getRedisCache().set( + 'event_buffer:cached_count', + count.toString(), + 'EX', + 15, // increase when we know it's stable + ); + return count; + } +} diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts deleted file mode 100644 index dcadb356..00000000 --- a/packages/db/src/buffers/event-buffer.ts +++ /dev/null @@ -1,281 +0,0 @@ -import { groupBy, omit } from 'ramda'; -import SuperJSON from 'superjson'; - -import { deepMergeObjects, getSafeJson } from '@openpanel/common'; -import { getRedisCache, getRedisPub } from '@openpanel/redis'; - -import { - TABLE_NAMES, - ch, - convertClickhouseDateToJs, -} from '../clickhouse-client'; -import { transformEvent } from '../services/event.service'; -import type { - IClickhouseEvent, - IServiceEvent, -} from '../services/event.service'; -import type { Find, FindMany } from './buffer'; -import { RedisBuffer } from './buffer'; -import { EventBuffer as NewEventBuffer } from './event-buffer-psql'; - -const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24; - -const testNewEventBuffer = new NewEventBuffer(); - -type BufferType = IClickhouseEvent; -export class EventBuffer extends RedisBuffer { - constructor() { - super('events_v2', null); - } - - getLastEventKey({ - projectId, - profileId, - }: { - projectId: string; - profileId: string; - }) { - return `session:last_screen_view:${projectId}:${profileId}`; - } - - public async getLastScreenView({ - projectId, - profileId, - }: { - projectId: string; - profileId: string; - }): Promise { - const event = await getRedisCache().get( - this.getLastEventKey({ projectId, profileId }), - ); - - if (event) { - const parsed = getSafeJson(event); - if (parsed) { - return transformEvent(parsed); - } - } - return null; - } - - public async add(event: BufferType) { - await super.add(event); - if (process.env.TEST_NEW_BUFFER) { - await testNewEventBuffer.add(event); - } - if (event.name === 'screen_view') { - await getRedisCache().set( - this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }), - JSON.stringify(event), - 'EX', - 60 * 31, - ); - } - } - - public onAdd(event: BufferType) { - getRedisPub().publish( - 'event:received', - SuperJSON.stringify(transformEvent(event)), - ); - if (event.profile_id) { - getRedisCache().set( - `live:event:${event.project_id}:${event.profile_id}`, - '', - 'EX', - 60 * 5, - ); - } - } - - public onInsert(items: BufferType[]) { - for (const event of items) { - getRedisPub().publish( - 'event:saved', - SuperJSON.stringify(transformEvent(event)), - ); - } - } - - protected async processItems( - queue: BufferType[], - ): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> { - const toInsert: BufferType[] = []; - const toStalled: BufferType[] = []; - - // Sort data by created_at - // oldest first - queue.sort(sortOldestFirst); - - // All events thats not a screen_view can be sent to clickhouse - // We only need screen_views since we want to calculate the duration of each screen - // To do this we need a minimum of 2 screen_views - queue - .filter((item) => item.name !== 'screen_view' || item.device === 'server') - .forEach((item, index) => { - // Find the last event with data and merge it with the current event - // We use profile_id here since this property can be set from backend as well - const lastEventWithData = queue - .slice(0, index) - .findLast((lastEvent) => { - return ( - lastEvent.project_id === item.project_id && - lastEvent.profile_id === item.profile_id && - lastEvent.path !== '' && - lastEvent.name === 'screen_view' - ); - }); - - const event = deepMergeObjects( - omit(['properties', 'duration'], lastEventWithData || {}), - item, - ); - - if (!event.properties) { - event.properties = {}; - } - - if (lastEventWithData) { - event.properties.__properties_from = lastEventWithData.id; - } - - return toInsert.push(event); - }); - - // Group screen_view events by session_id - const grouped = groupBy( - (item) => item.session_id, - queue.filter( - (item) => item.name === 'screen_view' && item.device !== 'server', - ), - ); - - // Iterate over each group - for (const [sessionId, screenViews] of Object.entries(grouped)) { - if (sessionId === '' || !sessionId) { - continue; - } - - // If there is only one screen_view event we can send it back to redis since we can't calculate the duration - const hasSessionEnd = queue.find( - (item) => item.name === 'session_end' && item.session_id === sessionId, - ); - - screenViews - ?.slice() - .sort(sortOldestFirst) - .forEach((item, index) => { - const nextScreenView = screenViews[index + 1]; - // if nextScreenView does not exists we can't calculate the duration (last event in session) - if (nextScreenView) { - const duration = - new Date(nextScreenView.created_at).getTime() - - new Date(item.created_at).getTime(); - const event = { - ...item, - properties: { - ...(item?.properties || {}), - __duration_from: nextScreenView.id, - }, - duration, - }; - toInsert.push(event); - } else if (hasSessionEnd) { - // push last event in session if we have a session_end event - toInsert.push(item); - } - }); - } // for of end - - // Check if we have any events that has been in the queue for more than 24 hour - // This should not theoretically happen but if it does we should move them to stalled - queue.forEach((item) => { - if ( - !toInsert.find((i) => i.id === item.id) && - convertClickhouseDateToJs(item.created_at).getTime() < - new Date().getTime() - STALLED_QUEUE_TIMEOUT - ) { - toStalled.push(item); - } - }); - - if (toStalled.length > 0) { - try { - this.logger.info(`Pushing to stalled queue (${toStalled.length})`, { - items: toStalled, - count: toStalled.length, - }); - await getRedisCache().rpush( - this.getKey('stalled'), - ...toStalled.map((item) => JSON.stringify(item)), - ); - } catch (error) { - toStalled.length = 0; - this.logger.error('Failed to push to stalled queue', { error }); - } - } - - return { - toInsert, - toKeep: queue.filter((item) => { - const willBeInserted = toInsert.find((i) => i.id === item.id); - const willBeStalled = toStalled.find((i) => i.id === item.id); - return willBeInserted === undefined && willBeStalled === undefined; - }), - }; - } - - private getChunks(items: BufferType[], size: number) { - const chunks = []; - for (let i = 0; i < items.length; i += size) { - chunks.push(items.slice(i, i + size)); - } - return chunks; - } - - protected async insertIntoDB(items: BufferType[]): Promise { - for (const chunk of this.getChunks(items, 500)) { - await ch.insert({ - table: TABLE_NAMES.events, - values: chunk, - format: 'JSONEachRow', - }); - await new Promise((resolve) => setTimeout(resolve, 50)); - } - } - - public findMany: FindMany = async ( - callback, - ) => { - if (await this.waitForReleasedLock()) { - return this.getQueue() - .then((queue) => { - return queue.filter(callback).map(transformEvent); - }) - .catch(() => { - return []; - }); - } - return []; - }; - - public find: Find = async (callback) => { - if (await this.waitForReleasedLock()) { - return this.getQueue(-1) - .then((queue) => { - const match = queue.find(callback); - return match ? transformEvent(match) : null; - }) - .catch(() => { - return null; - }); - } - return null; - }; -} - -const sortOldestFirst = (a: IClickhouseEvent, b: IClickhouseEvent) => - new Date(a.created_at).getTime() - new Date(b.created_at).getTime(); diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index ee4924f4..41536cce 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,16 +1,16 @@ -import { BotBuffer } from './bot-buffer'; -import { BotBuffer as NewBotBuffer } from './bot-buffer-psql'; -import { EventBuffer } from './event-buffer'; -import { EventBuffer as NewEventBuffer } from './event-buffer-psql'; -import { ProfileBuffer } from './profile-buffer'; -import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql'; +import { BotBuffer as BotBufferPsql } from './bot-buffer-psql'; +import { BotBuffer as BotBufferRedis } from './bot-buffer-redis'; +import { EventBuffer as EventBufferPsql } from './event-buffer-psql'; +import { EventBuffer as EventBufferRedis } from './event-buffer-redis'; +import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql'; +import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis'; export const eventBuffer = process.env.USE_NEW_BUFFER - ? new NewEventBuffer() - : new EventBuffer(); + ? new EventBufferRedis() + : new EventBufferPsql(); export const profileBuffer = process.env.USE_NEW_BUFFER - ? new NewProfileBuffer() - : new ProfileBuffer(); + ? new ProfileBufferRedis() + : new ProfileBufferPsql(); export const botBuffer = process.env.USE_NEW_BUFFER - ? new NewBotBuffer() - : new BotBuffer(); + ? new BotBufferRedis() + : new BotBufferPsql(); diff --git a/packages/db/src/buffers/partial-json-match.ts b/packages/db/src/buffers/partial-json-match.ts new file mode 100644 index 00000000..84e0340a --- /dev/null +++ b/packages/db/src/buffers/partial-json-match.ts @@ -0,0 +1,42 @@ +/** + * Checks if an object partially matches another object, including nested properties + * @param source The object to check against + * @param partial The partial object to match + * @returns boolean indicating if the partial object matches the source + */ +export function isPartialMatch(source: any, partial: any): boolean { + // Handle null/undefined cases + if (partial === null || partial === undefined) { + return source === partial; + } + + // If partial is not an object, do direct comparison + if (typeof partial !== 'object') { + return source === partial; + } + + // If source is null/undefined but partial is an object, no match + if (source === null || source === undefined) { + return false; + } + + // Check each property in partial + for (const key in partial) { + if ( + Object.prototype.hasOwnProperty.call(partial, key) && + partial[key] !== undefined + ) { + // If property doesn't exist in source, no match + if (!(key in source)) { + return false; + } + + // Recursively check nested objects + if (!isPartialMatch(source[key], partial[key])) { + return false; + } + } + } + + return true; +} diff --git a/packages/db/src/buffers/profile-buffer-psql.ts b/packages/db/src/buffers/profile-buffer-psql.ts index 1ab4c575..3163bd89 100644 --- a/packages/db/src/buffers/profile-buffer-psql.ts +++ b/packages/db/src/buffers/profile-buffer-psql.ts @@ -280,4 +280,12 @@ export class ProfileBuffer extends BaseBuffer { this.logger.info('Cleaned up old profiles', { deleted: deleted.count }); } + + async getBufferSize() { + return db.profileBuffer.count({ + where: { + processedAt: null, + }, + }); + } } diff --git a/packages/db/src/buffers/profile-buffer-redis.ts b/packages/db/src/buffers/profile-buffer-redis.ts new file mode 100644 index 00000000..337d78ea --- /dev/null +++ b/packages/db/src/buffers/profile-buffer-redis.ts @@ -0,0 +1,224 @@ +import { createHash } from 'node:crypto'; +import { getSafeJson } from '@openpanel/common'; +import { type Redis, getRedisCache } from '@openpanel/redis'; +import { dissocPath, mergeDeepRight, omit, whereEq } from 'ramda'; + +import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client'; +import type { IClickhouseProfile } from '../services/profile.service'; +import { BaseBuffer } from './base-buffer'; +import { isPartialMatch } from './partial-json-match'; + +export class ProfileBuffer extends BaseBuffer { + private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10) + : 200; + private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP + ? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10) + : 7; + private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) + : 1000; + + private readonly redisBufferKey = 'profile-buffer'; + private readonly redisProfilePrefix = 'profile-cache:'; + + private redis: Redis; + + constructor() { + super({ + name: 'profile', + onFlush: async () => { + await this.processBuffer(); + }, + }); + this.redis = getRedisCache(); + } + + private excludeKeys( + profile: IClickhouseProfile, + exclude: string[][], + ): IClickhouseProfile { + let filtered = profile; + for (const path of exclude) { + filtered = dissocPath(path, filtered); + } + return filtered; + } + + private match(source: any, partial: any): boolean { + const exclude = [ + ['created_at'], + ['properties', 'browser_version'], + ['properties', 'browserVersion'], + ['properties', 'latitude'], + ['properties', 'longitude'], + ['properties', 'os_version'], + ['properties', 'osVersion'], + ['properties', 'path'], + ['properties', 'referrer_name'], + ['properties', 'referrerName'], + ['properties', 'referrer_type'], + ['properties', 'referrerType'], + ['properties', 'referrer'], + ]; + + return isPartialMatch(source, this.excludeKeys(partial, exclude)); + } + + async add(profile: IClickhouseProfile) { + try { + this.logger.debug('Adding profile', { + projectId: profile.project_id, + profileId: profile.id, + profile, + }); + const cacheKey = `${this.redisProfilePrefix}${profile.project_id}:${profile.id}`; + + // Check if we have this profile in Redis cache + const existingProfile = await this.redis.get(cacheKey); + let mergedProfile = profile; + + if (!existingProfile) { + this.logger.debug('Profile not found in cache, checking Clickhouse', { + projectId: profile.project_id, + profileId: profile.id, + }); + // If not in cache, check Clickhouse + const clickhouseProfile = await this.fetchFromClickhouse(profile); + if (clickhouseProfile) { + this.logger.debug('Found existing profile in Clickhouse, merging', { + projectId: profile.project_id, + profileId: profile.id, + }); + mergedProfile = mergeDeepRight(clickhouseProfile, profile); + } + } else { + const parsedProfile = getSafeJson(existingProfile); + + if (parsedProfile) { + // Only merge if checksums are different + if (this.match(parsedProfile, profile)) { + return; // Skip if checksums match + } + + this.logger.debug('Profile changed, merging with cached version', { + existingProfile: parsedProfile, + incomingProfile: profile, + }); + mergedProfile = mergeDeepRight(parsedProfile, profile); + } + } + + const result = await this.redis + .multi() + .set( + cacheKey, + JSON.stringify(mergedProfile), + 'EX', + 60 * 60 * 24 * this.daysToKeep, + ) + .rpush(this.redisBufferKey, JSON.stringify(mergedProfile)) + .llen(this.redisBufferKey) + .exec(); + if (!result) { + this.logger.error('Failed to add profile to Redis', { + profile, + cacheKey, + }); + return; + } + const bufferLength = (result?.[2]?.[1] as number) ?? 0; + + this.logger.debug('Current buffer length', { + bufferLength, + batchSize: this.batchSize, + }); + if (bufferLength >= this.batchSize) { + this.logger.info('Buffer full, initiating flush'); + await this.tryFlush(); + } + } catch (error) { + this.logger.error('Failed to add profile', { error, profile }); + } + } + + private async fetchFromClickhouse( + profile: IClickhouseProfile, + ): Promise { + this.logger.debug('Fetching profile from Clickhouse', { + projectId: profile.project_id, + profileId: profile.id, + }); + const result = await chQuery( + `SELECT * + FROM ${TABLE_NAMES.profiles} + WHERE project_id = '${profile.project_id}' + AND id = '${profile.id}' + ${ + profile.is_external === false + ? 'AND created_at > now() - INTERVAL 2 DAY' + : '' + } + ORDER BY created_at DESC + LIMIT 1`, + ); + + this.logger.debug('Clickhouse fetch result', { + found: !!result[0], + projectId: profile.project_id, + profileId: profile.id, + }); + return result[0] || null; + } + + async processBuffer() { + try { + this.logger.info('Starting profile buffer processing'); + const profiles = await this.redis.lrange( + this.redisBufferKey, + 0, + this.batchSize - 1, + ); + + if (profiles.length === 0) { + this.logger.debug('No profiles to process'); + return; + } + + this.logger.info(`Processing ${profiles.length} profiles in buffer`); + const parsedProfiles = profiles.map((p) => + getSafeJson(p), + ); + + let processedChunks = 0; + for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { + processedChunks++; + this.logger.debug(`Processing chunk ${processedChunks}`, { + size: chunk.length, + }); + this.logger.debug('Chunk data', { chunk }); + + await ch.insert({ + table: TABLE_NAMES.profiles, + values: chunk, + format: 'JSONEachRow', + }); + this.logger.debug(`Successfully inserted chunk ${processedChunks}`); + } + + // Only remove profiles after successful insert + await this.redis.ltrim(this.redisBufferKey, profiles.length, -1); + + this.logger.info('Successfully completed profile processing', { + totalProfiles: profiles.length, + totalChunks: processedChunks, + }); + } catch (error) { + this.logger.error('Failed to process buffer', { error }); + } + } + + async getBufferSize() { + return getRedisCache().llen(this.redisBufferKey); + } +} diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts deleted file mode 100644 index deb2fe61..00000000 --- a/packages/db/src/buffers/profile-buffer.ts +++ /dev/null @@ -1,241 +0,0 @@ -import { groupBy, mergeDeepRight, prop } from 'ramda'; - -import { toDots } from '@openpanel/common'; -import { getRedisCache } from '@openpanel/redis'; - -import { escape } from 'sqlstring'; -import { - TABLE_NAMES, - ch, - chQuery, - formatClickhouseDate, -} from '../clickhouse-client'; -import { transformProfile } from '../services/profile.service'; -import type { - IClickhouseProfile, - IServiceProfile, -} from '../services/profile.service'; -import type { Find, FindMany } from './buffer'; -import { RedisBuffer } from './buffer'; -import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql'; -const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES - ? Number.parseInt(process.env.BATCH_SIZE_PROFILES, 10) - : 50; - -const testNewProfileBuffer = new NewProfileBuffer(); - -type BufferType = IClickhouseProfile; -export class ProfileBuffer extends RedisBuffer { - constructor() { - super('profiles', BATCH_SIZE); - } - - async add(profile: BufferType) { - await super.add(profile); - if (process.env.TEST_NEW_BUFFER) { - await testNewProfileBuffer.add(profile); - } - } - - // this will do a couple of things: - // - we slice the queue to maxBufferSize since this queries have a limit on character count - // - check redis cache for profiles - // - fetch missing profiles from clickhouse - // - merge the incoming profile with existing data - protected async processItems( - items: BufferType[], - ): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> { - const queue = this.combineQueueItems(items); - const slicedQueue = this.maxBufferSize - ? queue.slice(0, this.maxBufferSize) - : queue; - const redisProfiles = await this.getCachedProfiles(slicedQueue); - const dbProfiles = await this.fetchDbProfiles( - slicedQueue.filter((_, index) => !redisProfiles[index]), - ); - - const toInsert = this.createProfileValues( - slicedQueue, - redisProfiles, - dbProfiles, - ); - - if (toInsert.length > 0) { - await this.updateRedisCache(toInsert); - } - - return Promise.resolve({ - toInsert, - toKeep: this.maxBufferSize ? queue.slice(this.maxBufferSize) : [], - }); - } - - private combineQueueItems(queue: BufferType[]): BufferType[] { - const itemsToClickhouse = new Map(); - - queue.forEach((item) => { - const key = item.project_id + item.id; - const existing = itemsToClickhouse.get(key); - itemsToClickhouse.set(key, mergeDeepRight(existing ?? {}, item)); - }); - - return Array.from(itemsToClickhouse.values()); - } - - protected async insertIntoDB(items: BufferType[]): Promise { - await ch.insert({ - table: TABLE_NAMES.profiles, - values: items.map((item) => ({ - ...item, - created_at: item.created_at - ? formatClickhouseDate(item.created_at) - : '', - })), - format: 'JSONEachRow', - }); - } - - private matchPartialObject( - full: any, - partial: any, - options: { ignore: string[] }, - ): boolean { - if (typeof partial !== 'object' || partial === null) { - return partial === full; - } - - for (const key in partial) { - if (options.ignore.includes(key)) { - continue; - } - - if ( - !(key in full) || - !this.matchPartialObject(full[key], partial[key], options) - ) { - return false; - } - } - - return true; - } - - private async getCachedProfiles( - queue: BufferType[], - ): Promise<(IClickhouseProfile | null)[]> { - const redisCache = getRedisCache(); - const keys = queue.map((item) => `profile:${item.project_id}:${item.id}`); - - if (keys.length === 0) { - return []; - } - - const cachedProfiles = await redisCache.mget(...keys); - return cachedProfiles.map((profile) => { - try { - return profile ? JSON.parse(profile) : null; - } catch (error) { - return null; - } - }); - } - - private async fetchDbProfiles( - queue: IClickhouseProfile[], - ): Promise { - if (queue.length === 0) { - return []; - } - - // const grouped = groupBy(prop('project_id'), queue); - // const queries = Object.entries(grouped).map(([project_id, items]) => { - // if (!items) { - // return []; - // } - - // return chQuery( - // `SELECT - // * - // FROM ${TABLE_NAMES.profiles} - // WHERE - // id IN (${items.map((item) => escape(item.id)).join(',')}) - // AND created_at > INTERVAL 12 MONTH - // ORDER BY - // created_at DESC`, - // ); - // }); - - return await chQuery( - `SELECT - * - FROM ${TABLE_NAMES.profiles} - WHERE - (project_id, id) IN (${queue.map((item) => `('${item.project_id}', '${item.id}')`).join(',')}) - ORDER BY - created_at DESC`, - ); - } - - private createProfileValues( - queue: IClickhouseProfile[], - redisProfiles: (IClickhouseProfile | null)[], - dbProfiles: IClickhouseProfile[], - ): IClickhouseProfile[] { - return queue - .map((item, index) => { - const cachedProfile = redisProfiles[index]; - const dbProfile = dbProfiles.find( - (p) => p.id === item.id && p.project_id === item.project_id, - ); - const profile = cachedProfile || dbProfile; - - if ( - profile && - this.matchPartialObject( - profile, - { - ...item, - properties: toDots(item.properties), - }, - { - ignore: ['created_at'], - }, - ) - ) { - this.logger.debug('No changes for profile', { - profile, - }); - return null; - } - - return { - id: item.id, - first_name: item.first_name ?? profile?.first_name ?? '', - last_name: item.last_name ?? profile?.last_name ?? '', - email: item.email ?? profile?.email ?? '', - avatar: item.avatar ?? profile?.avatar ?? '', - properties: toDots({ - ...(profile?.properties ?? {}), - ...(item.properties ?? {}), - }), - project_id: item.project_id ?? profile?.project_id ?? '', - created_at: item.created_at ?? profile?.created_at ?? '', - is_external: item.is_external, - }; - }) - .flatMap((item) => (item ? [item] : [])); - } - - private async updateRedisCache(values: IClickhouseProfile[]): Promise { - const redisCache = getRedisCache(); - const multi = redisCache.multi(); - values.forEach((value) => { - multi.setex( - `profile:${value.project_id}:${value.id}`, - 60 * 30, // 30 minutes - JSON.stringify(value), - ); - }); - await multi.exec(); - } -} diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 3871dbb7..ada760b5 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -86,73 +86,55 @@ const cleanQuery = (query?: string) => ? query.replace(/\n/g, '').replace(/\s+/g, ' ').trim() : undefined; -const createChildLogger = (property: string, args?: any[]) => { - if (property === 'insert') { - return logger.child({ - property, - table: args?.[0]?.table, - values: (args?.[0]?.values || []).length, - }); +async function withRetry( + operation: () => Promise, + maxRetries = 3, + baseDelay = 500, +): Promise { + let lastError: Error | undefined; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + const res = await operation(); + if (attempt > 0) { + logger.info('Retry operation succeeded', { attempt }); + } + return res; + } catch (error: any) { + lastError = error; + + if ( + error.message.includes('Connect') || + error.message.includes('socket hang up') || + error.message.includes('Timeout error') + ) { + const delay = baseDelay * 2 ** attempt; + logger.warn( + `Attempt ${attempt + 1}/${maxRetries} failed, retrying in ${delay}ms`, + { + error: error.message, + }, + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + + throw error; // Non-retriable error + } } - return logger.child({ - property, - table: args?.[0]?.table, - query: cleanQuery(args?.[0]?.query), - }); -}; + throw lastError; +} export const ch = new Proxy(originalCh, { get(target, property, receiver) { - if (property === 'insert' || property === 'query') { - return async (...args: any[]) => { - const childLogger = createChildLogger(property, args); + const value = Reflect.get(target, property, receiver); - if (property === 'insert') { - childLogger.info('insert info'); - } - - try { - // First attempt - if (property in target) { - // @ts-expect-error - return await target[property](...args); - } - } catch (error: unknown) { - if ( - error instanceof Error && - (error.message.includes('Connect') || - error.message.includes('socket hang up') || - error.message.includes('Timeout error')) - ) { - childLogger.error('First failed attempt', { - error, - }); - await new Promise((resolve) => setTimeout(resolve, 500)); - try { - // Retry once - childLogger.info(`Retrying ${property}`); - if (property in target) { - // @ts-expect-error - return await target[property](...args); - } - } catch (retryError) { - childLogger.error('Second failed attempt', retryError); - throw retryError; // Rethrow or handle as needed - } - } else { - childLogger.error('Failed without retry', { - ...args[0], - error, - }); - - // Handle other errors or rethrow them - throw error; - } - } - }; + if (property === 'insert') { + return (...args: any[]) => withRetry(() => value.apply(target, args)); } - return Reflect.get(target, property, receiver); + + return value; }, });