From bc08566cd4aae5af1968e82340752727df2d6481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 13:11:54 +0000 Subject: [PATCH] fix --- packages/db/src/buffers/event-buffer.ts | 150 ++++-------------------- 1 file changed, 20 insertions(+), 130 deletions(-) diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 15d29b5b..9c1e5eb9 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -14,9 +14,8 @@ import { import { BaseBuffer } from './base-buffer'; /** - * Simplified Event Buffer + * Event Buffer * - * Rules: * 1. All events go into a single list buffer (event_buffer:queue) * 2. screen_view events are handled specially: * - Store current screen_view as "last" for the session @@ -24,15 +23,8 @@ import { BaseBuffer } from './base-buffer'; * 3. session_end events: * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer - * 4. Flush: Simply process all events from the list buffer - * - * Optimizations: - * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips - * - Batched publishes: All PUBLISH commands are included in the multi pipeline - * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) + * 4. Flush: Process all events from the list buffer */ - -// Pending event for local buffer interface PendingEvent { event: IClickhouseEvent; eventJson: string; @@ -41,7 +33,6 @@ interface PendingEvent { } export class EventBuffer extends BaseBuffer { - // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) : 4000; @@ -49,58 +40,48 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; - // Micro-batching configuration private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) - : 10; // Flush every 10ms by default + : 10; private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) - : 100; // Or when we hit 100 events + : 100; - // Local event buffer for micro-batching private pendingEvents: PendingEvent[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; + /** Tracks consecutive flush failures for observability; reset on success. */ + private flushRetryCount = 0; - // Throttled publish configuration private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) - : 1000; // Publish at most once per second + : 1000; private lastPublishTime = 0; private pendingPublishEvent: IClickhouseEvent | null = null; private publishTimer: ReturnType | null = null; private activeVisitorsExpiration = 60 * 5; // 5 minutes - - // LIST - Stores all events ready to be flushed private queueKey = 'event_buffer:queue'; - - // STRING - Tracks total buffer size incrementally protected bufferCounterKey = 'event_buffer:total_count'; - // Script SHAs for loaded Lua scripts private scriptShas: { addScreenView?: string; addSessionEnd?: string; } = {}; - // Hash key for storing last screen_view per session private getLastScreenViewKeyBySession(sessionId: string) { return `event_buffer:last_screen_view:session:${sessionId}`; } - // Hash key for storing last screen_view per profile private getLastScreenViewKeyByProfile(projectId: string, profileId: string) { return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`; } /** - * Lua script for handling screen_view addition - RACE-CONDITION SAFE without GroupMQ + * Lua script for screen_view addition. + * Uses GETDEL for atomic get-and-delete to prevent race conditions. * - * Strategy: Use Redis GETDEL (atomic get-and-delete) to ensure only ONE thread - * can process the "last" screen_view at a time. - * - * KEYS[1] = last screen_view key (by session) - stores both event and timestamp as JSON + * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) * KEYS[3] = queue key * KEYS[4] = buffer counter key @@ -115,24 +96,18 @@ local counterKey = KEYS[4] local newEventData = ARGV[1] local ttl = tonumber(ARGV[2]) --- GETDEL is atomic: get previous and delete in one operation --- This ensures only ONE thread gets the previous event local previousEventData = redis.call("GETDEL", sessionKey) --- Store new screen_view as last for session redis.call("SET", sessionKey, newEventData, "EX", ttl) --- Store new screen_view as last for profile (if key provided) if profileKey and profileKey ~= "" then redis.call("SET", profileKey, newEventData, "EX", ttl) end --- If there was a previous screen_view, add it to queue with calculated duration if previousEventData then local prev = cjson.decode(previousEventData) local curr = cjson.decode(newEventData) - -- Calculate duration (ensure non-negative to handle clock skew) if prev.ts and curr.ts then prev.event.duration = math.max(0, curr.ts - prev.ts) end @@ -146,9 +121,8 @@ return 0 `; /** - * Lua script for handling session_end - RACE-CONDITION SAFE - * - * Uses GETDEL to atomically retrieve and delete the last screen_view + * Lua script for session_end. + * Uses GETDEL to atomically retrieve and delete the last screen_view. * * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) @@ -163,11 +137,9 @@ local queueKey = KEYS[3] local counterKey = KEYS[4] local sessionEndJson = ARGV[1] --- GETDEL is atomic: only ONE thread gets the last screen_view local previousEventData = redis.call("GETDEL", sessionKey) local added = 0 --- If there was a previous screen_view, add it to queue if previousEventData then local prev = cjson.decode(previousEventData) redis.call("RPUSH", queueKey, cjson.encode(prev.event)) @@ -175,12 +147,10 @@ if previousEventData then added = added + 1 end --- Add session_end to queue redis.call("RPUSH", queueKey, sessionEndJson) redis.call("INCR", counterKey) added = added + 1 --- Delete profile key if profileKey and profileKey ~= "" then redis.call("DEL", profileKey) end @@ -195,14 +165,9 @@ return added await this.processBuffer(); }, }); - // Load Lua scripts into Redis on startup this.loadScripts(); } - /** - * Load Lua scripts into Redis and cache their SHAs. - * This avoids sending the entire script on every call. - */ private async loadScripts() { try { const redis = getRedisCache(); @@ -224,27 +189,14 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - // Add all events to local buffer - they will be flushed together for (const event of events) { this.add(event); } } - /** - * Add an event into the local buffer for micro-batching. - * - * Events are buffered locally and flushed to Redis every microBatchIntervalMs - * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. - * - * Logic: - * - screen_view: Store as "last" for session, flush previous if exists - * - session_end: Flush last screen_view + session_end - * - Other events: Add directly to queue - */ add(event: IClickhouseEvent, _multi?: ReturnType) { const eventJson = JSON.stringify(event); - // Determine event type and prepare data let type: PendingEvent['type'] = 'regular'; let eventWithTimestamp: string | undefined; @@ -266,22 +218,18 @@ return added type, }; - // If a multi was provided (legacy bulkAdd pattern), add directly without batching if (_multi) { this.addToMulti(_multi, pendingEvent); return; } - // Add to local buffer for micro-batching this.pendingEvents.push(pendingEvent); - // Check if we should flush immediately due to size if (this.pendingEvents.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); return; } - // Schedule flush if not already scheduled if (!this.flushTimer) { this.flushTimer = setTimeout(() => { this.flushTimer = null; @@ -290,10 +238,6 @@ return added } } - /** - * Add a single pending event to a multi pipeline. - * Used both for legacy _multi pattern and during batch flush. - */ private addToMulti(multi: ReturnType, pending: PendingEvent) { const { event, eventJson, eventWithTimestamp, type } = pending; @@ -333,11 +277,9 @@ return added eventJson, ); } else { - // Regular events go directly to queue multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); } - // Active visitor tracking (simplified - only ZADD, no redundant SET) if (event.profile_id) { this.incrementActiveVisitorCount( multi, @@ -347,12 +289,7 @@ return added } } - /** - * Force flush all pending events from local buffer to Redis immediately. - * Useful for testing or when you need to ensure all events are persisted. - */ public async flush() { - // Clear any pending timer if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; @@ -360,10 +297,6 @@ return added await this.flushLocalBuffer(); } - /** - * Flush all pending events from local buffer to Redis in a single pipeline. - * This is the core optimization - batching many events into one round-trip. - */ private async flushLocalBuffer() { if (this.isFlushing || this.pendingEvents.length === 0) { return; @@ -371,7 +304,6 @@ return added this.isFlushing = true; - // Grab current pending events and clear buffer const eventsToFlush = this.pendingEvents; this.pendingEvents = []; @@ -379,48 +311,44 @@ return added const redis = getRedisCache(); const multi = redis.multi(); - // Add all events to the pipeline for (const pending of eventsToFlush) { this.addToMulti(multi, pending); } await multi.exec(); - // Throttled publish - just signal that events were received - // Store the last event for publishing (we only need one to signal activity) + this.flushRetryCount = 0; + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; if (lastEvent) { this.scheduleThrottledPublish(lastEvent.event); } } catch (error) { - this.logger.error('Failed to flush local buffer to Redis', { + // Re-queue failed events at the front to preserve order and avoid data loss + this.pendingEvents = eventsToFlush.concat(this.pendingEvents); + + this.flushRetryCount += 1; + this.logger.warn('Failed to flush local buffer to Redis; events re-queued', { error, eventCount: eventsToFlush.length, + flushRetryCount: this.flushRetryCount, }); } finally { this.isFlushing = false; } } - /** - * Throttled publish - publishes at most once per publishThrottleMs. - * Instead of publishing every event, we just signal that events were received. - * This reduces pub/sub load from 3000/s to 1/s. - */ private scheduleThrottledPublish(event: IClickhouseEvent) { - // Always keep the latest event this.pendingPublishEvent = event; const now = Date.now(); const timeSinceLastPublish = now - this.lastPublishTime; - // If enough time has passed, publish immediately if (timeSinceLastPublish >= this.publishThrottleMs) { this.executeThrottledPublish(); return; } - // Otherwise, schedule a publish if not already scheduled if (!this.publishTimer) { const delay = this.publishThrottleMs - timeSinceLastPublish; this.publishTimer = setTimeout(() => { @@ -430,9 +358,6 @@ return added } } - /** - * Execute the throttled publish with the latest pending event. - */ private executeThrottledPublish() { if (!this.pendingPublishEvent) { return; @@ -442,17 +367,12 @@ return added this.pendingPublishEvent = null; this.lastPublishTime = Date.now(); - // Fire-and-forget publish (no multi = returns Promise) const result = publishEvent('events', 'received', transformEvent(event)); if (result instanceof Promise) { result.catch(() => {}); } } - /** - * Execute a Lua script using EVALSHA (cached) or fallback to EVAL. - * This avoids sending the entire script on every call. - */ private evalScript( multi: ReturnType, scriptName: keyof typeof this.scriptShas, @@ -463,32 +383,18 @@ return added const sha = this.scriptShas[scriptName]; if (sha) { - // Use EVALSHA with cached SHA multi.evalsha(sha, numKeys, ...args); } else { - // Fallback to EVAL and try to reload script multi.eval(scriptContent, numKeys, ...args); this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`); - // Attempt to reload scripts in background this.loadScripts(); } } - /** - * Process the Redis buffer - simplified version. - * - * Simply: - * 1. Fetch events from the queue (up to batchSize) - * 2. Parse and sort them - * 3. Insert into ClickHouse in chunks - * 4. Publish saved events - * 5. Clean up processed events from queue - */ async processBuffer() { const redis = getRedisCache(); try { - // Fetch events from queue const queueEvents = await redis.lrange( this.queueKey, 0, @@ -500,7 +406,6 @@ return added return; } - // Parse events const eventsToClickhouse: IClickhouseEvent[] = []; for (const eventStr of queueEvents) { const event = getSafeJson(eventStr); @@ -514,14 +419,12 @@ return added return; } - // Sort events by creation time eventsToClickhouse.sort( (a, b) => new Date(a.created_at || 0).getTime() - new Date(b.created_at || 0).getTime(), ); - // Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { totalEvents: eventsToClickhouse.length, chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), @@ -535,14 +438,12 @@ return added }); } - // Publish "saved" events const pubMulti = getRedisPub().multi(); for (const event of eventsToClickhouse) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); - // Clean up processed events from queue await redis .multi() .ltrim(this.queueKey, queueEvents.length, -1) @@ -558,9 +459,6 @@ return added } } - /** - * Retrieve the latest screen_view event for a given session or profile - */ public async getLastScreenView( params: | { @@ -604,13 +502,6 @@ return added }); } - /** - * Track active visitors using ZADD only. - * - * Optimization: Removed redundant heartbeat SET key. - * The ZADD score (timestamp) already tracks when a visitor was last seen. - * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. - */ private incrementActiveVisitorCount( multi: ReturnType, projectId: string, @@ -618,7 +509,6 @@ return added ) { const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - // Only ZADD - the score is the timestamp, no need for separate heartbeat key return multi.zadd(zsetKey, now, profileId); }