try simplified event buffer with duration calculation on the fly instead
This commit is contained in:
@@ -5,32 +5,9 @@ import {
|
||||
publishEvent,
|
||||
} from '@openpanel/redis';
|
||||
import { ch } from '../clickhouse/client';
|
||||
import {
|
||||
type IClickhouseEvent,
|
||||
type IServiceEvent,
|
||||
transformEvent,
|
||||
} from '../services/event.service';
|
||||
import { type IClickhouseEvent } from '../services/event.service';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
/**
|
||||
* Event Buffer
|
||||
*
|
||||
* 1. All events go into a single list buffer (event_buffer:queue)
|
||||
* 2. screen_view events are handled specially:
|
||||
* - Store current screen_view as "last" for the session
|
||||
* - When a new screen_view arrives, flush the previous one with calculated duration
|
||||
* 3. session_end events:
|
||||
* - Retrieve the last screen_view (don't modify it)
|
||||
* - Push both screen_view and session_end to buffer
|
||||
* 4. Flush: Process all events from the list buffer
|
||||
*/
|
||||
interface PendingEvent {
|
||||
event: IClickhouseEvent;
|
||||
eventJson: string;
|
||||
eventWithTimestamp?: string;
|
||||
type: 'regular' | 'screen_view' | 'session_end';
|
||||
}
|
||||
|
||||
export class EventBuffer extends BaseBuffer {
|
||||
private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10)
|
||||
@@ -46,7 +23,7 @@ export class EventBuffer extends BaseBuffer {
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10)
|
||||
: 100;
|
||||
|
||||
private pendingEvents: PendingEvent[] = [];
|
||||
private pendingEvents: IClickhouseEvent[] = [];
|
||||
private flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private isFlushing = false;
|
||||
/** Tracks consecutive flush failures for observability; reset on success. */
|
||||
@@ -59,100 +36,6 @@ export class EventBuffer extends BaseBuffer {
|
||||
private queueKey = 'event_buffer:queue';
|
||||
protected bufferCounterKey = 'event_buffer:total_count';
|
||||
|
||||
private scriptShas: {
|
||||
addScreenView?: string;
|
||||
addSessionEnd?: string;
|
||||
} = {};
|
||||
|
||||
private getLastScreenViewKeyBySession(sessionId: string) {
|
||||
return `event_buffer:last_screen_view:session:${sessionId}`;
|
||||
}
|
||||
|
||||
private getLastScreenViewKeyByProfile(projectId: string, profileId: string) {
|
||||
return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lua script for screen_view addition.
|
||||
* Uses GETDEL for atomic get-and-delete to prevent race conditions.
|
||||
*
|
||||
* KEYS[1] = last screen_view key (by session)
|
||||
* KEYS[2] = last screen_view key (by profile, may be empty)
|
||||
* KEYS[3] = queue key
|
||||
* KEYS[4] = buffer counter key
|
||||
* ARGV[1] = new event with timestamp as JSON: {"event": {...}, "ts": 123456}
|
||||
* ARGV[2] = TTL for last screen_view (1 hour)
|
||||
*/
|
||||
private readonly addScreenViewScript = `
|
||||
local sessionKey = KEYS[1]
|
||||
local profileKey = KEYS[2]
|
||||
local queueKey = KEYS[3]
|
||||
local counterKey = KEYS[4]
|
||||
local newEventData = ARGV[1]
|
||||
local ttl = tonumber(ARGV[2])
|
||||
|
||||
local previousEventData = redis.call("GETDEL", sessionKey)
|
||||
|
||||
redis.call("SET", sessionKey, newEventData, "EX", ttl)
|
||||
|
||||
if profileKey and profileKey ~= "" then
|
||||
redis.call("SET", profileKey, newEventData, "EX", ttl)
|
||||
end
|
||||
|
||||
if previousEventData then
|
||||
local prev = cjson.decode(previousEventData)
|
||||
local curr = cjson.decode(newEventData)
|
||||
|
||||
if prev.ts and curr.ts then
|
||||
prev.event.duration = math.max(0, curr.ts - prev.ts)
|
||||
end
|
||||
|
||||
redis.call("RPUSH", queueKey, cjson.encode(prev.event))
|
||||
redis.call("INCR", counterKey)
|
||||
return 1
|
||||
end
|
||||
|
||||
return 0
|
||||
`;
|
||||
|
||||
/**
|
||||
* Lua script for session_end.
|
||||
* Uses GETDEL to atomically retrieve and delete the last screen_view.
|
||||
*
|
||||
* KEYS[1] = last screen_view key (by session)
|
||||
* KEYS[2] = last screen_view key (by profile, may be empty)
|
||||
* KEYS[3] = queue key
|
||||
* KEYS[4] = buffer counter key
|
||||
* ARGV[1] = session_end event JSON
|
||||
*/
|
||||
private readonly addSessionEndScript = `
|
||||
local sessionKey = KEYS[1]
|
||||
local profileKey = KEYS[2]
|
||||
local queueKey = KEYS[3]
|
||||
local counterKey = KEYS[4]
|
||||
local sessionEndJson = ARGV[1]
|
||||
|
||||
local previousEventData = redis.call("GETDEL", sessionKey)
|
||||
local added = 0
|
||||
|
||||
if previousEventData then
|
||||
local prev = cjson.decode(previousEventData)
|
||||
redis.call("RPUSH", queueKey, cjson.encode(prev.event))
|
||||
redis.call("INCR", counterKey)
|
||||
added = added + 1
|
||||
end
|
||||
|
||||
redis.call("RPUSH", queueKey, sessionEndJson)
|
||||
redis.call("INCR", counterKey)
|
||||
added = added + 1
|
||||
|
||||
if profileKey and profileKey ~= "" then
|
||||
redis.call("DEL", profileKey)
|
||||
end
|
||||
|
||||
return added
|
||||
`;
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'event',
|
||||
@@ -160,27 +43,6 @@ return added
|
||||
await this.processBuffer();
|
||||
},
|
||||
});
|
||||
this.loadScripts();
|
||||
}
|
||||
|
||||
private async loadScripts() {
|
||||
try {
|
||||
const redis = getRedisCache();
|
||||
const [screenViewSha, sessionEndSha] = await Promise.all([
|
||||
redis.script('LOAD', this.addScreenViewScript),
|
||||
redis.script('LOAD', this.addSessionEndScript),
|
||||
]);
|
||||
|
||||
this.scriptShas.addScreenView = screenViewSha as string;
|
||||
this.scriptShas.addSessionEnd = sessionEndSha as string;
|
||||
|
||||
this.logger.info('Loaded Lua scripts into Redis', {
|
||||
addScreenView: this.scriptShas.addScreenView,
|
||||
addSessionEnd: this.scriptShas.addSessionEnd,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to load Lua scripts', { error });
|
||||
}
|
||||
}
|
||||
|
||||
bulkAdd(events: IClickhouseEvent[]) {
|
||||
@@ -190,30 +52,7 @@ return added
|
||||
}
|
||||
|
||||
add(event: IClickhouseEvent) {
|
||||
const eventJson = JSON.stringify(event);
|
||||
|
||||
let type: PendingEvent['type'] = 'regular';
|
||||
let eventWithTimestamp: string | undefined;
|
||||
|
||||
if (event.session_id && event.name === 'screen_view') {
|
||||
type = 'screen_view';
|
||||
const timestamp = new Date(event.created_at || Date.now()).getTime();
|
||||
eventWithTimestamp = JSON.stringify({
|
||||
event: event,
|
||||
ts: timestamp,
|
||||
});
|
||||
} else if (event.session_id && event.name === 'session_end') {
|
||||
type = 'session_end';
|
||||
}
|
||||
|
||||
const pendingEvent: PendingEvent = {
|
||||
event,
|
||||
eventJson,
|
||||
eventWithTimestamp,
|
||||
type,
|
||||
};
|
||||
|
||||
this.pendingEvents.push(pendingEvent);
|
||||
this.pendingEvents.push(event);
|
||||
|
||||
if (this.pendingEvents.length >= this.microBatchMaxSize) {
|
||||
this.flushLocalBuffer();
|
||||
@@ -228,57 +67,6 @@ return added
|
||||
}
|
||||
}
|
||||
|
||||
private addToMulti(multi: ReturnType<Redis['multi']>, pending: PendingEvent) {
|
||||
const { event, eventJson, eventWithTimestamp, type } = pending;
|
||||
|
||||
if (type === 'screen_view' && event.session_id) {
|
||||
const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
|
||||
const profileKey = event.profile_id
|
||||
? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id)
|
||||
: '';
|
||||
|
||||
this.evalScript(
|
||||
multi,
|
||||
'addScreenView',
|
||||
this.addScreenViewScript,
|
||||
4,
|
||||
sessionKey,
|
||||
profileKey,
|
||||
this.queueKey,
|
||||
this.bufferCounterKey,
|
||||
eventWithTimestamp!,
|
||||
'3600',
|
||||
);
|
||||
} else if (type === 'session_end' && event.session_id) {
|
||||
const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
|
||||
const profileKey = event.profile_id
|
||||
? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id)
|
||||
: '';
|
||||
|
||||
this.evalScript(
|
||||
multi,
|
||||
'addSessionEnd',
|
||||
this.addSessionEndScript,
|
||||
4,
|
||||
sessionKey,
|
||||
profileKey,
|
||||
this.queueKey,
|
||||
this.bufferCounterKey,
|
||||
eventJson,
|
||||
);
|
||||
} else {
|
||||
multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey);
|
||||
}
|
||||
|
||||
if (event.profile_id) {
|
||||
this.incrementActiveVisitorCount(
|
||||
multi,
|
||||
event.project_id,
|
||||
event.profile_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async flush() {
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
@@ -301,9 +89,17 @@ return added
|
||||
const redis = getRedisCache();
|
||||
const multi = redis.multi();
|
||||
|
||||
for (const pending of eventsToFlush) {
|
||||
this.addToMulti(multi, pending);
|
||||
for (const event of eventsToFlush) {
|
||||
multi.rpush(this.queueKey, JSON.stringify(event));
|
||||
if (event.profile_id) {
|
||||
this.incrementActiveVisitorCount(
|
||||
multi,
|
||||
event.project_id,
|
||||
event.profile_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
multi.incrby(this.bufferCounterKey, eventsToFlush.length);
|
||||
|
||||
await multi.exec();
|
||||
|
||||
@@ -314,11 +110,14 @@ return added
|
||||
this.pendingEvents = eventsToFlush.concat(this.pendingEvents);
|
||||
|
||||
this.flushRetryCount += 1;
|
||||
this.logger.warn('Failed to flush local buffer to Redis; events re-queued', {
|
||||
error,
|
||||
eventCount: eventsToFlush.length,
|
||||
flushRetryCount: this.flushRetryCount,
|
||||
});
|
||||
this.logger.warn(
|
||||
'Failed to flush local buffer to Redis; events re-queued',
|
||||
{
|
||||
error,
|
||||
eventCount: eventsToFlush.length,
|
||||
flushRetryCount: this.flushRetryCount,
|
||||
},
|
||||
);
|
||||
} finally {
|
||||
this.isFlushing = false;
|
||||
// Events may have accumulated while we were flushing; schedule another flush if needed
|
||||
@@ -331,24 +130,6 @@ return added
|
||||
}
|
||||
}
|
||||
|
||||
private evalScript(
|
||||
multi: ReturnType<Redis['multi']>,
|
||||
scriptName: keyof typeof this.scriptShas,
|
||||
scriptContent: string,
|
||||
numKeys: number,
|
||||
...args: (string | number)[]
|
||||
) {
|
||||
const sha = this.scriptShas[scriptName];
|
||||
|
||||
if (sha) {
|
||||
multi.evalsha(sha, numKeys, ...args);
|
||||
} else {
|
||||
multi.eval(scriptContent, numKeys, ...args);
|
||||
this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`);
|
||||
this.loadScripts();
|
||||
}
|
||||
}
|
||||
|
||||
async processBuffer() {
|
||||
const redis = getRedisCache();
|
||||
|
||||
@@ -398,7 +179,10 @@ return added
|
||||
|
||||
const countByProject = new Map<string, number>();
|
||||
for (const event of eventsToClickhouse) {
|
||||
countByProject.set(event.project_id, (countByProject.get(event.project_id) ?? 0) + 1);
|
||||
countByProject.set(
|
||||
event.project_id,
|
||||
(countByProject.get(event.project_id) ?? 0) + 1,
|
||||
);
|
||||
}
|
||||
for (const [projectId, count] of countByProject) {
|
||||
publishEvent('events', 'batch', { projectId, count });
|
||||
@@ -419,42 +203,6 @@ return added
|
||||
}
|
||||
}
|
||||
|
||||
public async getLastScreenView(
|
||||
params:
|
||||
| {
|
||||
sessionId: string;
|
||||
}
|
||||
| {
|
||||
projectId: string;
|
||||
profileId: string;
|
||||
},
|
||||
): Promise<IServiceEvent | null> {
|
||||
const redis = getRedisCache();
|
||||
|
||||
let lastScreenViewKey: string;
|
||||
if ('sessionId' in params) {
|
||||
lastScreenViewKey = this.getLastScreenViewKeyBySession(params.sessionId);
|
||||
} else {
|
||||
lastScreenViewKey = this.getLastScreenViewKeyByProfile(
|
||||
params.projectId,
|
||||
params.profileId,
|
||||
);
|
||||
}
|
||||
|
||||
const eventDataStr = await redis.get(lastScreenViewKey);
|
||||
|
||||
if (eventDataStr) {
|
||||
const eventData = getSafeJson<{ event: IClickhouseEvent; ts: number }>(
|
||||
eventDataStr,
|
||||
);
|
||||
if (eventData?.event) {
|
||||
return transformEvent(eventData.event);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public async getBufferSize() {
|
||||
return this.getBufferSizeWithCounter(async () => {
|
||||
const redis = getRedisCache();
|
||||
|
||||
Reference in New Issue
Block a user