This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-06 13:11:54 +00:00
parent bf39804767
commit bc08566cd4

View File

@@ -14,9 +14,8 @@ import {
import { BaseBuffer } from './base-buffer';
/**
* Simplified Event Buffer
* Event Buffer
*
* Rules:
* 1. All events go into a single list buffer (event_buffer:queue)
* 2. screen_view events are handled specially:
* - Store current screen_view as "last" for the session
@@ -24,15 +23,8 @@ import { BaseBuffer } from './base-buffer';
* 3. session_end events:
* - Retrieve the last screen_view (don't modify it)
* - Push both screen_view and session_end to buffer
* 4. Flush: Simply process all events from the list buffer
*
* Optimizations:
* - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips
* - Batched publishes: All PUBLISH commands are included in the multi pipeline
* - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET)
* 4. Flush: Process all events from the list buffer
*/
// Pending event for local buffer
interface PendingEvent {
event: IClickhouseEvent;
eventJson: string;
@@ -41,7 +33,6 @@ interface PendingEvent {
}
export class EventBuffer extends BaseBuffer {
// Configurable limits
private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10)
: 4000;
@@ -49,58 +40,48 @@ export class EventBuffer extends BaseBuffer {
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 1000;
// Micro-batching configuration
private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS
? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10)
: 10; // Flush every 10ms by default
: 10;
private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10)
: 100; // Or when we hit 100 events
: 100;
// Local event buffer for micro-batching
private pendingEvents: PendingEvent[] = [];
private flushTimer: ReturnType<typeof setTimeout> | null = null;
private isFlushing = false;
/** Tracks consecutive flush failures for observability; reset on success. */
private flushRetryCount = 0;
// Throttled publish configuration
private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS
? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10)
: 1000; // Publish at most once per second
: 1000;
private lastPublishTime = 0;
private pendingPublishEvent: IClickhouseEvent | null = null;
private publishTimer: ReturnType<typeof setTimeout> | null = null;
private activeVisitorsExpiration = 60 * 5; // 5 minutes
// LIST - Stores all events ready to be flushed
private queueKey = 'event_buffer:queue';
// STRING - Tracks total buffer size incrementally
protected bufferCounterKey = 'event_buffer:total_count';
// Script SHAs for loaded Lua scripts
private scriptShas: {
addScreenView?: string;
addSessionEnd?: string;
} = {};
// Hash key for storing last screen_view per session
private getLastScreenViewKeyBySession(sessionId: string) {
return `event_buffer:last_screen_view:session:${sessionId}`;
}
// Hash key for storing last screen_view per profile
private getLastScreenViewKeyByProfile(projectId: string, profileId: string) {
return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`;
}
/**
* Lua script for handling screen_view addition - RACE-CONDITION SAFE without GroupMQ
* Lua script for screen_view addition.
* Uses GETDEL for atomic get-and-delete to prevent race conditions.
*
* Strategy: Use Redis GETDEL (atomic get-and-delete) to ensure only ONE thread
* can process the "last" screen_view at a time.
*
* KEYS[1] = last screen_view key (by session) - stores both event and timestamp as JSON
* KEYS[1] = last screen_view key (by session)
* KEYS[2] = last screen_view key (by profile, may be empty)
* KEYS[3] = queue key
* KEYS[4] = buffer counter key
@@ -115,24 +96,18 @@ local counterKey = KEYS[4]
local newEventData = ARGV[1]
local ttl = tonumber(ARGV[2])
-- GETDEL is atomic: get previous and delete in one operation
-- This ensures only ONE thread gets the previous event
local previousEventData = redis.call("GETDEL", sessionKey)
-- Store new screen_view as last for session
redis.call("SET", sessionKey, newEventData, "EX", ttl)
-- Store new screen_view as last for profile (if key provided)
if profileKey and profileKey ~= "" then
redis.call("SET", profileKey, newEventData, "EX", ttl)
end
-- If there was a previous screen_view, add it to queue with calculated duration
if previousEventData then
local prev = cjson.decode(previousEventData)
local curr = cjson.decode(newEventData)
-- Calculate duration (ensure non-negative to handle clock skew)
if prev.ts and curr.ts then
prev.event.duration = math.max(0, curr.ts - prev.ts)
end
@@ -146,9 +121,8 @@ return 0
`;
/**
* Lua script for handling session_end - RACE-CONDITION SAFE
*
* Uses GETDEL to atomically retrieve and delete the last screen_view
* Lua script for session_end.
* Uses GETDEL to atomically retrieve and delete the last screen_view.
*
* KEYS[1] = last screen_view key (by session)
* KEYS[2] = last screen_view key (by profile, may be empty)
@@ -163,11 +137,9 @@ local queueKey = KEYS[3]
local counterKey = KEYS[4]
local sessionEndJson = ARGV[1]
-- GETDEL is atomic: only ONE thread gets the last screen_view
local previousEventData = redis.call("GETDEL", sessionKey)
local added = 0
-- If there was a previous screen_view, add it to queue
if previousEventData then
local prev = cjson.decode(previousEventData)
redis.call("RPUSH", queueKey, cjson.encode(prev.event))
@@ -175,12 +147,10 @@ if previousEventData then
added = added + 1
end
-- Add session_end to queue
redis.call("RPUSH", queueKey, sessionEndJson)
redis.call("INCR", counterKey)
added = added + 1
-- Delete profile key
if profileKey and profileKey ~= "" then
redis.call("DEL", profileKey)
end
@@ -195,14 +165,9 @@ return added
await this.processBuffer();
},
});
// Load Lua scripts into Redis on startup
this.loadScripts();
}
/**
* Load Lua scripts into Redis and cache their SHAs.
* This avoids sending the entire script on every call.
*/
private async loadScripts() {
try {
const redis = getRedisCache();
@@ -224,27 +189,14 @@ return added
}
bulkAdd(events: IClickhouseEvent[]) {
// Add all events to local buffer - they will be flushed together
for (const event of events) {
this.add(event);
}
}
/**
* Add an event into the local buffer for micro-batching.
*
* Events are buffered locally and flushed to Redis every microBatchIntervalMs
* or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips.
*
* Logic:
* - screen_view: Store as "last" for session, flush previous if exists
* - session_end: Flush last screen_view + session_end
* - Other events: Add directly to queue
*/
add(event: IClickhouseEvent, _multi?: ReturnType<Redis['multi']>) {
const eventJson = JSON.stringify(event);
// Determine event type and prepare data
let type: PendingEvent['type'] = 'regular';
let eventWithTimestamp: string | undefined;
@@ -266,22 +218,18 @@ return added
type,
};
// If a multi was provided (legacy bulkAdd pattern), add directly without batching
if (_multi) {
this.addToMulti(_multi, pendingEvent);
return;
}
// Add to local buffer for micro-batching
this.pendingEvents.push(pendingEvent);
// Check if we should flush immediately due to size
if (this.pendingEvents.length >= this.microBatchMaxSize) {
this.flushLocalBuffer();
return;
}
// Schedule flush if not already scheduled
if (!this.flushTimer) {
this.flushTimer = setTimeout(() => {
this.flushTimer = null;
@@ -290,10 +238,6 @@ return added
}
}
/**
* Add a single pending event to a multi pipeline.
* Used both for legacy _multi pattern and during batch flush.
*/
private addToMulti(multi: ReturnType<Redis['multi']>, pending: PendingEvent) {
const { event, eventJson, eventWithTimestamp, type } = pending;
@@ -333,11 +277,9 @@ return added
eventJson,
);
} else {
// Regular events go directly to queue
multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey);
}
// Active visitor tracking (simplified - only ZADD, no redundant SET)
if (event.profile_id) {
this.incrementActiveVisitorCount(
multi,
@@ -347,12 +289,7 @@ return added
}
}
/**
* Force flush all pending events from local buffer to Redis immediately.
* Useful for testing or when you need to ensure all events are persisted.
*/
public async flush() {
// Clear any pending timer
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
@@ -360,10 +297,6 @@ return added
await this.flushLocalBuffer();
}
/**
* Flush all pending events from local buffer to Redis in a single pipeline.
* This is the core optimization - batching many events into one round-trip.
*/
private async flushLocalBuffer() {
if (this.isFlushing || this.pendingEvents.length === 0) {
return;
@@ -371,7 +304,6 @@ return added
this.isFlushing = true;
// Grab current pending events and clear buffer
const eventsToFlush = this.pendingEvents;
this.pendingEvents = [];
@@ -379,48 +311,44 @@ return added
const redis = getRedisCache();
const multi = redis.multi();
// Add all events to the pipeline
for (const pending of eventsToFlush) {
this.addToMulti(multi, pending);
}
await multi.exec();
// Throttled publish - just signal that events were received
// Store the last event for publishing (we only need one to signal activity)
this.flushRetryCount = 0;
const lastEvent = eventsToFlush[eventsToFlush.length - 1];
if (lastEvent) {
this.scheduleThrottledPublish(lastEvent.event);
}
} catch (error) {
this.logger.error('Failed to flush local buffer to Redis', {
// Re-queue failed events at the front to preserve order and avoid data loss
this.pendingEvents = eventsToFlush.concat(this.pendingEvents);
this.flushRetryCount += 1;
this.logger.warn('Failed to flush local buffer to Redis; events re-queued', {
error,
eventCount: eventsToFlush.length,
flushRetryCount: this.flushRetryCount,
});
} finally {
this.isFlushing = false;
}
}
/**
* Throttled publish - publishes at most once per publishThrottleMs.
* Instead of publishing every event, we just signal that events were received.
* This reduces pub/sub load from 3000/s to 1/s.
*/
private scheduleThrottledPublish(event: IClickhouseEvent) {
// Always keep the latest event
this.pendingPublishEvent = event;
const now = Date.now();
const timeSinceLastPublish = now - this.lastPublishTime;
// If enough time has passed, publish immediately
if (timeSinceLastPublish >= this.publishThrottleMs) {
this.executeThrottledPublish();
return;
}
// Otherwise, schedule a publish if not already scheduled
if (!this.publishTimer) {
const delay = this.publishThrottleMs - timeSinceLastPublish;
this.publishTimer = setTimeout(() => {
@@ -430,9 +358,6 @@ return added
}
}
/**
* Execute the throttled publish with the latest pending event.
*/
private executeThrottledPublish() {
if (!this.pendingPublishEvent) {
return;
@@ -442,17 +367,12 @@ return added
this.pendingPublishEvent = null;
this.lastPublishTime = Date.now();
// Fire-and-forget publish (no multi = returns Promise)
const result = publishEvent('events', 'received', transformEvent(event));
if (result instanceof Promise) {
result.catch(() => {});
}
}
/**
* Execute a Lua script using EVALSHA (cached) or fallback to EVAL.
* This avoids sending the entire script on every call.
*/
private evalScript(
multi: ReturnType<Redis['multi']>,
scriptName: keyof typeof this.scriptShas,
@@ -463,32 +383,18 @@ return added
const sha = this.scriptShas[scriptName];
if (sha) {
// Use EVALSHA with cached SHA
multi.evalsha(sha, numKeys, ...args);
} else {
// Fallback to EVAL and try to reload script
multi.eval(scriptContent, numKeys, ...args);
this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`);
// Attempt to reload scripts in background
this.loadScripts();
}
}
/**
* Process the Redis buffer - simplified version.
*
* Simply:
* 1. Fetch events from the queue (up to batchSize)
* 2. Parse and sort them
* 3. Insert into ClickHouse in chunks
* 4. Publish saved events
* 5. Clean up processed events from queue
*/
async processBuffer() {
const redis = getRedisCache();
try {
// Fetch events from queue
const queueEvents = await redis.lrange(
this.queueKey,
0,
@@ -500,7 +406,6 @@ return added
return;
}
// Parse events
const eventsToClickhouse: IClickhouseEvent[] = [];
for (const eventStr of queueEvents) {
const event = getSafeJson<IClickhouseEvent>(eventStr);
@@ -514,14 +419,12 @@ return added
return;
}
// Sort events by creation time
eventsToClickhouse.sort(
(a, b) =>
new Date(a.created_at || 0).getTime() -
new Date(b.created_at || 0).getTime(),
);
// Insert events into ClickHouse in chunks
this.logger.info('Inserting events into ClickHouse', {
totalEvents: eventsToClickhouse.length,
chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize),
@@ -535,14 +438,12 @@ return added
});
}
// Publish "saved" events
const pubMulti = getRedisPub().multi();
for (const event of eventsToClickhouse) {
await publishEvent('events', 'saved', transformEvent(event), pubMulti);
}
await pubMulti.exec();
// Clean up processed events from queue
await redis
.multi()
.ltrim(this.queueKey, queueEvents.length, -1)
@@ -558,9 +459,6 @@ return added
}
}
/**
* Retrieve the latest screen_view event for a given session or profile
*/
public async getLastScreenView(
params:
| {
@@ -604,13 +502,6 @@ return added
});
}
/**
* Track active visitors using ZADD only.
*
* Optimization: Removed redundant heartbeat SET key.
* The ZADD score (timestamp) already tracks when a visitor was last seen.
* We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors.
*/
private incrementActiveVisitorCount(
multi: ReturnType<Redis['multi']>,
projectId: string,
@@ -618,7 +509,6 @@ return added
) {
const now = Date.now();
const zsetKey = `live:visitors:${projectId}`;
// Only ZADD - the score is the timestamp, no need for separate heartbeat key
return multi.zadd(zsetKey, now, profileId);
}