feat: use groupmq instead of bullmq for incoming events (#206)
* wip * wip working group queue * wip * wip * wip * fix: groupmq package (tests failed) * minor fixes * fix: zero is fine for duration * add logger * fix: make buffers more lightweight * bump groupmq * new buffers and bump groupmq * fix: buffers based on comments * fix: use profileId as groupId if exists * bump groupmq * add concurrency env for only events
This commit is contained in:
committed by
GitHub
parent
ca4a880acd
commit
0b4fcbad69
@@ -1,6 +1,6 @@
|
||||
import { generateSecureId } from '@openpanel/common/server/id';
|
||||
import { type ILogger, createLogger } from '@openpanel/logger';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import { getRedisCache, runEvery } from '@openpanel/redis';
|
||||
|
||||
export class BaseBuffer {
|
||||
name: string;
|
||||
@@ -9,6 +9,8 @@ export class BaseBuffer {
|
||||
lockTimeout = 60;
|
||||
onFlush: () => void;
|
||||
|
||||
protected bufferCounterKey: string;
|
||||
|
||||
constructor(options: {
|
||||
name: string;
|
||||
onFlush: () => Promise<void>;
|
||||
@@ -17,6 +19,7 @@ export class BaseBuffer {
|
||||
this.name = options.name;
|
||||
this.lockKey = `lock:${this.name}`;
|
||||
this.onFlush = options.onFlush;
|
||||
this.bufferCounterKey = `${this.name}:buffer:count`;
|
||||
}
|
||||
|
||||
protected chunks<T>(items: T[], size: number) {
|
||||
@@ -27,6 +30,53 @@ export class BaseBuffer {
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to safely get buffer size with counter fallback
|
||||
*/
|
||||
protected async getBufferSizeWithCounter(
|
||||
fallbackFn: () => Promise<number>,
|
||||
): Promise<number> {
|
||||
const key = this.bufferCounterKey;
|
||||
try {
|
||||
await runEvery({
|
||||
interval: 60 * 15,
|
||||
key: `${this.name}-buffer:resync`,
|
||||
fn: async () => {
|
||||
try {
|
||||
const actual = await fallbackFn();
|
||||
await getRedisCache().set(this.bufferCounterKey, actual.toString());
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to resync buffer counter', { error });
|
||||
}
|
||||
},
|
||||
}).catch(() => {});
|
||||
|
||||
const counterValue = await getRedisCache().get(key);
|
||||
if (counterValue !== null) {
|
||||
const parsed = Number.parseInt(counterValue, 10);
|
||||
if (!Number.isNaN(parsed)) {
|
||||
return Math.max(0, parsed);
|
||||
}
|
||||
// Corrupted value → treat as missing
|
||||
this.logger.warn('Invalid buffer counter value, reinitializing', {
|
||||
key,
|
||||
counterValue,
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize counter with current size
|
||||
const count = await fallbackFn();
|
||||
await getRedisCache().set(key, count.toString());
|
||||
return count;
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
'Failed to get buffer size from counter, using fallback',
|
||||
{ error },
|
||||
);
|
||||
return fallbackFn();
|
||||
}
|
||||
}
|
||||
|
||||
private async releaseLock(lockId: string): Promise<void> {
|
||||
this.logger.debug('Releasing lock...');
|
||||
const script = `
|
||||
@@ -60,6 +110,11 @@ export class BaseBuffer {
|
||||
error,
|
||||
lockId,
|
||||
});
|
||||
// On error, we might want to reset counter to avoid drift
|
||||
if (this.bufferCounterKey) {
|
||||
this.logger.warn('Resetting buffer counter due to flush error');
|
||||
await getRedisCache().del(this.bufferCounterKey);
|
||||
}
|
||||
} finally {
|
||||
await this.releaseLock(lockId);
|
||||
this.logger.info('Flush completed', {
|
||||
|
||||
@@ -24,11 +24,15 @@ export class BotBuffer extends BaseBuffer {
|
||||
|
||||
async add(event: IClickhouseBotEvent) {
|
||||
try {
|
||||
// Add event to Redis list
|
||||
await this.redis.rpush(this.redisKey, JSON.stringify(event));
|
||||
// Add event and increment counter atomically
|
||||
await this.redis
|
||||
.multi()
|
||||
.rpush(this.redisKey, JSON.stringify(event))
|
||||
.incr(this.bufferCounterKey)
|
||||
.exec();
|
||||
|
||||
// Check buffer length
|
||||
const bufferLength = await this.redis.llen(this.redisKey);
|
||||
// Check buffer length using counter (fallback to LLEN if missing)
|
||||
const bufferLength = await this.getBufferSize();
|
||||
|
||||
if (bufferLength >= this.batchSize) {
|
||||
await this.tryFlush();
|
||||
@@ -60,8 +64,12 @@ export class BotBuffer extends BaseBuffer {
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
|
||||
// Only remove events after successful insert
|
||||
await this.redis.ltrim(this.redisKey, events.length, -1);
|
||||
// Only remove events after successful insert and update counter
|
||||
await this.redis
|
||||
.multi()
|
||||
.ltrim(this.redisKey, events.length, -1)
|
||||
.decrby(this.bufferCounterKey, events.length)
|
||||
.exec();
|
||||
|
||||
this.logger.info('Processed bot events', {
|
||||
count: events.length,
|
||||
@@ -72,6 +80,6 @@ export class BotBuffer extends BaseBuffer {
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
return getRedisCache().llen(this.redisKey);
|
||||
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
|
||||
}
|
||||
}
|
||||
503
packages/db/src/buffers/event-buffer.test.ts
Normal file
503
packages/db/src/buffers/event-buffer.test.ts
Normal file
@@ -0,0 +1,503 @@
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import {
|
||||
afterAll,
|
||||
beforeAll,
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi,
|
||||
} from 'vitest';
|
||||
import { ch } from '../clickhouse/client';
|
||||
|
||||
// Mock transformEvent to avoid circular dependency with buffers -> services -> buffers
|
||||
vi.mock('../services/event.service', () => ({
|
||||
transformEvent: (event: any) => ({
|
||||
id: event.id ?? 'id',
|
||||
name: event.name,
|
||||
deviceId: event.device_id,
|
||||
profileId: event.profile_id,
|
||||
projectId: event.project_id,
|
||||
sessionId: event.session_id,
|
||||
properties: event.properties ?? {},
|
||||
createdAt: new Date(event.created_at ?? Date.now()),
|
||||
country: event.country,
|
||||
city: event.city,
|
||||
region: event.region,
|
||||
longitude: event.longitude,
|
||||
latitude: event.latitude,
|
||||
os: event.os,
|
||||
osVersion: event.os_version,
|
||||
browser: event.browser,
|
||||
browserVersion: event.browser_version,
|
||||
device: event.device,
|
||||
brand: event.brand,
|
||||
model: event.model,
|
||||
duration: event.duration ?? 0,
|
||||
path: event.path ?? '',
|
||||
origin: event.origin ?? '',
|
||||
referrer: event.referrer,
|
||||
referrerName: event.referrer_name,
|
||||
referrerType: event.referrer_type,
|
||||
meta: event.meta,
|
||||
importedAt: undefined,
|
||||
sdkName: event.sdk_name,
|
||||
sdkVersion: event.sdk_version,
|
||||
profile: event.profile,
|
||||
}),
|
||||
}));
|
||||
|
||||
import { EventBuffer } from './event-buffer';
|
||||
|
||||
const redis = getRedisCache();
|
||||
|
||||
beforeEach(async () => {
|
||||
await redis.flushdb();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
try {
|
||||
await redis.quit();
|
||||
} catch {}
|
||||
});
|
||||
|
||||
describe('EventBuffer with real Redis', () => {
|
||||
let eventBuffer: EventBuffer;
|
||||
|
||||
beforeEach(() => {
|
||||
eventBuffer = new EventBuffer();
|
||||
});
|
||||
|
||||
it('keeps a single screen_view pending until a subsequent event arrives', async () => {
|
||||
const screenView = {
|
||||
project_id: 'p1',
|
||||
profile_id: 'u1',
|
||||
session_id: 'session_a',
|
||||
name: 'screen_view',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(screenView);
|
||||
|
||||
// Not eligible for processing yet (only 1 event in session)
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
const sessionKey = `event_buffer:session:${screenView.session_id}`;
|
||||
const events = await redis.lrange(sessionKey, 0, -1);
|
||||
expect(events.length).toBe(1);
|
||||
expect(JSON.parse(events[0]!)).toMatchObject({
|
||||
session_id: 'session_a',
|
||||
name: 'screen_view',
|
||||
});
|
||||
});
|
||||
|
||||
it('processes two screen_view events and leaves only the last one pending', async () => {
|
||||
const t0 = Date.now();
|
||||
const first = {
|
||||
project_id: 'p1',
|
||||
profile_id: 'u1',
|
||||
session_id: 'session_b',
|
||||
name: 'screen_view',
|
||||
created_at: new Date(t0).toISOString(),
|
||||
} as any;
|
||||
const second = {
|
||||
project_id: 'p1',
|
||||
profile_id: 'u1',
|
||||
session_id: 'session_b',
|
||||
name: 'screen_view',
|
||||
created_at: new Date(t0 + 1000).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(first);
|
||||
await eventBuffer.add(second);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// First screen_view should be flushed to ClickHouse, second should remain pending in Redis
|
||||
expect(insertSpy).toHaveBeenCalledWith({
|
||||
format: 'JSONEachRow',
|
||||
table: 'events',
|
||||
values: [
|
||||
{
|
||||
...first,
|
||||
duration: 1000,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const sessionKey = `event_buffer:session:${first.session_id}`;
|
||||
const storedEvents = await redis.lrange(sessionKey, 0, -1);
|
||||
expect(storedEvents.length).toBe(1);
|
||||
const remaining = JSON.parse(storedEvents[0]!);
|
||||
expect(remaining).toMatchObject({
|
||||
session_id: 'session_b',
|
||||
name: 'screen_view',
|
||||
created_at: second.created_at,
|
||||
});
|
||||
});
|
||||
|
||||
it('clears session when a session_end event arrives', async () => {
|
||||
const t0 = Date.now();
|
||||
const first = {
|
||||
project_id: 'p1',
|
||||
profile_id: 'u1',
|
||||
session_id: 'session_c',
|
||||
name: 'screen_view',
|
||||
created_at: new Date(t0).toISOString(),
|
||||
} as any;
|
||||
const end = {
|
||||
project_id: 'p1',
|
||||
profile_id: 'u1',
|
||||
session_id: 'session_c',
|
||||
name: 'session_end',
|
||||
created_at: new Date(t0 + 1000).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(first);
|
||||
await eventBuffer.add(end);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValue(undefined as any);
|
||||
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// Both events should be flushed, leaving no pending session events
|
||||
expect(insertSpy).toHaveBeenCalledWith({
|
||||
format: 'JSONEachRow',
|
||||
table: 'events',
|
||||
values: [first, end],
|
||||
});
|
||||
const sessionKey = `event_buffer:session:${first.session_id}`;
|
||||
const storedEvents = await redis.lrange(sessionKey, 0, -1);
|
||||
expect(storedEvents.length).toBe(0);
|
||||
});
|
||||
|
||||
it('queues and processes non-session events in regular queue', async () => {
|
||||
const event = {
|
||||
project_id: 'p2',
|
||||
name: 'custom_event',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(event);
|
||||
|
||||
// Should be in regular queue
|
||||
const regularQueueKey = 'event_buffer:regular_queue';
|
||||
expect(await redis.llen(regularQueueKey)).toBe(1);
|
||||
|
||||
// Buffer counter should reflect outstanding = 1
|
||||
expect(await eventBuffer.getBufferSize()).toBe(1);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// Regular queue should be trimmed
|
||||
expect(await redis.llen(regularQueueKey)).toBe(0);
|
||||
expect(insertSpy).toHaveBeenCalled();
|
||||
|
||||
// Buffer counter back to 0
|
||||
expect(await eventBuffer.getBufferSize()).toBe(0);
|
||||
});
|
||||
|
||||
it('adds session to ready set at 2 events and removes after processing', async () => {
|
||||
const s = 'session_ready';
|
||||
const e1 = {
|
||||
project_id: 'p3',
|
||||
profile_id: 'u3',
|
||||
session_id: s,
|
||||
name: 'screen_view',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
const e2 = {
|
||||
...e1,
|
||||
created_at: new Date(Date.now() + 1000).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(e1);
|
||||
|
||||
// One event -> not ready
|
||||
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
|
||||
|
||||
await eventBuffer.add(e2);
|
||||
|
||||
// Two events -> ready
|
||||
expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull();
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// After processing with one pending left, session should be removed from ready set
|
||||
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
|
||||
expect(insertSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('sets last screen_view key and clears it on session_end', async () => {
|
||||
const projectId = 'p4';
|
||||
const profileId = 'u4';
|
||||
const sessionId = 'session_last';
|
||||
const lastKey = `session:last_screen_view:${projectId}:${profileId}`;
|
||||
|
||||
const view = {
|
||||
project_id: projectId,
|
||||
profile_id: profileId,
|
||||
session_id: sessionId,
|
||||
name: 'screen_view',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(view);
|
||||
|
||||
// Should be set in Redis
|
||||
expect(await redis.get(lastKey)).not.toBeNull();
|
||||
|
||||
const end = {
|
||||
project_id: projectId,
|
||||
profile_id: profileId,
|
||||
session_id: sessionId,
|
||||
name: 'session_end',
|
||||
created_at: new Date(Date.now() + 1000).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(end);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// Key should be deleted by session_end
|
||||
expect(await redis.get(lastKey)).toBeNull();
|
||||
expect(insertSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('getLastScreenView works for profile and session queries', async () => {
|
||||
const projectId = 'p5';
|
||||
const profileId = 'u5';
|
||||
const sessionId = 'session_glsv';
|
||||
|
||||
const view = {
|
||||
project_id: projectId,
|
||||
profile_id: profileId,
|
||||
session_id: sessionId,
|
||||
name: 'screen_view',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(view);
|
||||
|
||||
const byProfile = await eventBuffer.getLastScreenView({
|
||||
projectId,
|
||||
profileId,
|
||||
});
|
||||
|
||||
if (!byProfile) {
|
||||
throw new Error('byProfile is null');
|
||||
}
|
||||
|
||||
expect(byProfile.name).toBe('screen_view');
|
||||
|
||||
const bySession = await eventBuffer.getLastScreenView({
|
||||
projectId,
|
||||
sessionId,
|
||||
});
|
||||
|
||||
if (!bySession) {
|
||||
throw new Error('bySession is null');
|
||||
}
|
||||
|
||||
expect(bySession.name).toBe('screen_view');
|
||||
});
|
||||
|
||||
it('buffer counter reflects pending after processing 2 screen_view events', async () => {
|
||||
const sessionId = 'session_counter';
|
||||
const a = {
|
||||
project_id: 'p6',
|
||||
profile_id: 'u6',
|
||||
session_id: sessionId,
|
||||
name: 'screen_view',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
const b = {
|
||||
...a,
|
||||
created_at: new Date(Date.now() + 1000).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(a);
|
||||
await eventBuffer.add(b);
|
||||
|
||||
// Counter counts enqueued items
|
||||
expect(await eventBuffer.getBufferSize()).toBeGreaterThanOrEqual(2);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
await eventBuffer.processBuffer();
|
||||
|
||||
// One pending screen_view left -> counter should be 1
|
||||
expect(await eventBuffer.getBufferSize()).toBe(1);
|
||||
expect(insertSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('inserts in chunks according to EVENT_BUFFER_CHUNK_SIZE', async () => {
|
||||
const prev = process.env.EVENT_BUFFER_CHUNK_SIZE;
|
||||
process.env.EVENT_BUFFER_CHUNK_SIZE = '1';
|
||||
const eb = new EventBuffer();
|
||||
|
||||
const e1 = {
|
||||
project_id: 'pc',
|
||||
name: 'ev1',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
const e2 = {
|
||||
project_id: 'pc',
|
||||
name: 'ev2',
|
||||
created_at: new Date(Date.now() + 1).toISOString(),
|
||||
} as any;
|
||||
|
||||
await eb.add(e1);
|
||||
await eb.add(e2);
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValue(undefined as any);
|
||||
|
||||
await eb.processBuffer();
|
||||
|
||||
// With chunk size 1 and two events, insert should be called twice
|
||||
expect(insertSpy.mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
// Restore env
|
||||
if (prev === undefined) delete process.env.EVENT_BUFFER_CHUNK_SIZE;
|
||||
else process.env.EVENT_BUFFER_CHUNK_SIZE = prev;
|
||||
});
|
||||
|
||||
it('counts active visitors after adding an event with profile', async () => {
|
||||
const e = {
|
||||
project_id: 'p7',
|
||||
profile_id: 'u7',
|
||||
name: 'custom',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
await eventBuffer.add(e);
|
||||
|
||||
const count = await eventBuffer.getActiveVisitorCount('p7');
|
||||
expect(count).toBeGreaterThanOrEqual(1);
|
||||
});
|
||||
|
||||
it('batches pending session updates (respects cap) during processBuffer', async () => {
|
||||
const prev = process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE;
|
||||
process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = '3';
|
||||
const eb = new EventBuffer();
|
||||
|
||||
// Create many sessions each with 2 screen_view events → leaves 1 pending per session
|
||||
const numSessions = 10;
|
||||
const base = Date.now();
|
||||
|
||||
for (let i = 0; i < numSessions; i++) {
|
||||
const sid = `batch_s_${i}`;
|
||||
const e1 = {
|
||||
project_id: 'p8',
|
||||
profile_id: `u${i}`,
|
||||
session_id: sid,
|
||||
name: 'screen_view',
|
||||
created_at: new Date(base + i * 10).toISOString(),
|
||||
} as any;
|
||||
const e2 = {
|
||||
...e1,
|
||||
created_at: new Date(base + i * 10 + 1).toISOString(),
|
||||
} as any;
|
||||
await eb.add(e1);
|
||||
await eb.add(e2);
|
||||
}
|
||||
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValue(undefined as any);
|
||||
const evalSpy = vi.spyOn(redis as any, 'eval');
|
||||
|
||||
await eb.processBuffer();
|
||||
|
||||
// Only consider eval calls for batchUpdateSessionsScript (2 keys, second is total_count)
|
||||
const batchEvalCalls = evalSpy.mock.calls.filter(
|
||||
(call) => call[1] === 2 && call[3] === 'event_buffer:total_count',
|
||||
);
|
||||
|
||||
const expectedCalls = Math.ceil(numSessions / 3);
|
||||
expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls);
|
||||
|
||||
function countSessionsInEvalCall(args: any[]): number {
|
||||
let idx = 4; // ARGV starts after: script, numKeys, key1, key2
|
||||
let count = 0;
|
||||
while (idx < args.length) {
|
||||
if (idx + 3 >= args.length) break;
|
||||
const pendingCount = Number.parseInt(String(args[idx + 3]), 10);
|
||||
idx += 4 + Math.max(0, pendingCount);
|
||||
count += 1;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
for (const call of batchEvalCalls) {
|
||||
expect(call[1]).toBe(2);
|
||||
expect(call[2]).toBe('event_buffer:ready_sessions');
|
||||
expect(call[3]).toBe('event_buffer:total_count');
|
||||
|
||||
const sessionsInThisCall = countSessionsInEvalCall(call.slice(0));
|
||||
expect(sessionsInThisCall).toBeLessThanOrEqual(3);
|
||||
expect(sessionsInThisCall).toBeGreaterThan(0);
|
||||
}
|
||||
|
||||
expect(insertSpy).toHaveBeenCalled();
|
||||
|
||||
// Restore env
|
||||
if (prev === undefined)
|
||||
delete process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE;
|
||||
else process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = prev;
|
||||
|
||||
evalSpy.mockRestore();
|
||||
insertSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('flushes a lone session_end and clears the session list', async () => {
|
||||
const s = 'session_only_end';
|
||||
const end = {
|
||||
project_id: 'p9',
|
||||
profile_id: 'u9',
|
||||
session_id: s,
|
||||
name: 'session_end',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
const eb = new EventBuffer();
|
||||
await eb.add(end);
|
||||
|
||||
// Should be considered ready even though only 1 event (session_end)
|
||||
const insertSpy = vi
|
||||
.spyOn(ch, 'insert')
|
||||
.mockResolvedValueOnce(undefined as any);
|
||||
|
||||
await eb.processBuffer();
|
||||
|
||||
expect(insertSpy).toHaveBeenCalledWith({
|
||||
format: 'JSONEachRow',
|
||||
table: 'events',
|
||||
values: [end],
|
||||
});
|
||||
|
||||
const sessionKey = `event_buffer:session:${s}`;
|
||||
const remaining = await redis.lrange(sessionKey, 0, -1);
|
||||
expect(remaining.length).toBe(0);
|
||||
|
||||
insertSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
@@ -1,4 +1,4 @@
|
||||
import { getSafeJson, setSuperJson } from '@openpanel/json';
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
import {
|
||||
type Redis,
|
||||
getRedisCache,
|
||||
@@ -38,12 +38,16 @@ import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export class EventBuffer extends BaseBuffer {
|
||||
// Configurable limits
|
||||
// How many days to keep buffered session metadata before cleanup
|
||||
private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP
|
||||
? Number.parseFloat(process.env.EVENT_BUFFER_DAYS_TO_KEEP)
|
||||
: 3;
|
||||
// How many events we attempt to FETCH per flush cycle (split across sessions/non-sessions)
|
||||
// Prefer new env EVENT_BUFFER_BATCH_SIZE; fallback to legacy EVENT_BUFFER_BATCH_SIZE
|
||||
private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10)
|
||||
: 4000;
|
||||
// How many events per insert chunk we send to ClickHouse (insert batch size)
|
||||
private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
|
||||
: 1000;
|
||||
@@ -53,8 +57,20 @@ export class EventBuffer extends BaseBuffer {
|
||||
process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE,
|
||||
10,
|
||||
)
|
||||
: 300;
|
||||
|
||||
// Cap of how many ready sessions to scan per flush cycle (configurable via env)
|
||||
private maxSessionsPerFlush = process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH, 10)
|
||||
: 500;
|
||||
|
||||
// Soft time budget per flush (ms) to avoid long lock holds
|
||||
private flushTimeBudgetMs = process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS, 10)
|
||||
: 1000;
|
||||
|
||||
private minEventsInSession = 2;
|
||||
|
||||
private activeVisitorsExpiration = 60 * 5; // 5 minutes
|
||||
|
||||
private sessionEvents = ['screen_view', 'session_end'];
|
||||
@@ -65,113 +81,164 @@ export class EventBuffer extends BaseBuffer {
|
||||
// SORTED SET - Tracks all active session IDs with their timestamps
|
||||
private sessionSortedKey = 'event_buffer:sessions_sorted'; // sorted set of session IDs
|
||||
|
||||
// SORTED SET - Tracks sessions that are ready for processing (have >= minEvents)
|
||||
private readySessionsKey = 'event_buffer:ready_sessions';
|
||||
|
||||
// STRING - Tracks total buffer size incrementally
|
||||
protected bufferCounterKey = 'event_buffer:total_count';
|
||||
|
||||
private readonly sessionKeyPrefix = 'event_buffer:session:';
|
||||
// LIST - Stores events for a given session
|
||||
private getSessionKey(sessionId: string) {
|
||||
return `${this.sessionKeyPrefix}${sessionId}`;
|
||||
}
|
||||
/**
|
||||
* Lua script that loops through sessions and returns a JSON-encoded list of
|
||||
* session objects (sessionId and events). It stops once a total number of events
|
||||
* >= batchSize is reached. It also cleans up any empty sessions.
|
||||
* Optimized Lua script that processes ready sessions efficiently.
|
||||
* Only fetches from sessions known to have >= minEvents.
|
||||
* Limits the number of events fetched per session to avoid huge payloads.
|
||||
*/
|
||||
private readonly processSessionsScript = `
|
||||
local sessionSortedKey = KEYS[1]
|
||||
private readonly processReadySessionsScript = `
|
||||
local readySessionsKey = KEYS[1]
|
||||
local sessionPrefix = KEYS[2]
|
||||
local batchSize = tonumber(ARGV[1])
|
||||
local minEvents = tonumber(ARGV[2])
|
||||
local maxSessions = tonumber(ARGV[1])
|
||||
local maxEventsPerSession = tonumber(ARGV[2])
|
||||
local startOffset = tonumber(ARGV[3]) or 0
|
||||
|
||||
local result = {}
|
||||
local sessionsToRemove = {}
|
||||
local sessionIds = redis.call('ZRANGE', sessionSortedKey, 0, -1)
|
||||
|
||||
-- Get up to maxSessions ready sessions from window [startOffset, startOffset+maxSessions-1]
|
||||
local stopIndex = startOffset + maxSessions - 1
|
||||
local sessionIds = redis.call('ZRANGE', readySessionsKey, startOffset, stopIndex)
|
||||
local resultIndex = 1
|
||||
local totalEvents = 0
|
||||
|
||||
for i, sessionId in ipairs(sessionIds) do
|
||||
local sessionKey = sessionPrefix .. sessionId
|
||||
local events = redis.call('LRANGE', sessionKey, 0, -1)
|
||||
local eventCount = redis.call('LLEN', sessionKey)
|
||||
|
||||
if #events == 0 then
|
||||
if eventCount == 0 then
|
||||
-- Session is empty, remove from ready set
|
||||
table.insert(sessionsToRemove, sessionId)
|
||||
-- If we have collected 100 sessions to remove, remove them now
|
||||
if #sessionsToRemove >= 100 then
|
||||
redis.call('ZREM', sessionSortedKey, unpack(sessionsToRemove))
|
||||
sessionsToRemove = {}
|
||||
end
|
||||
elseif #events >= minEvents then
|
||||
result[resultIndex] = { sessionId = sessionId, events = events }
|
||||
else
|
||||
-- Fetch limited number of events to avoid huge payloads
|
||||
local eventsToFetch = math.min(eventCount, maxEventsPerSession)
|
||||
local events = redis.call('LRANGE', sessionKey, 0, eventsToFetch - 1)
|
||||
|
||||
result[resultIndex] = {
|
||||
sessionId = sessionId,
|
||||
events = events,
|
||||
totalEventCount = eventCount
|
||||
}
|
||||
resultIndex = resultIndex + 1
|
||||
totalEvents = totalEvents + #events
|
||||
end
|
||||
|
||||
-- Only check if we should break AFTER processing the entire session
|
||||
if totalEvents >= batchSize then
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
-- Remove any remaining sessions
|
||||
-- Clean up empty sessions from ready set
|
||||
if #sessionsToRemove > 0 then
|
||||
redis.call('ZREM', sessionSortedKey, unpack(sessionsToRemove))
|
||||
redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove))
|
||||
end
|
||||
|
||||
return cjson.encode(result)
|
||||
`;
|
||||
|
||||
/**
|
||||
* New atomic Lua script to update a session's list with pending events.
|
||||
* Instead of doing a separate DEL and RPUSH (which leaves a race condition),
|
||||
* this script will:
|
||||
* 1. Remove the first `snapshotCount` items from the session list.
|
||||
* 2. Re-insert the pending events (provided as additional arguments)
|
||||
* at the head (using LPUSH in reverse order to preserve order).
|
||||
* Optimized atomic Lua script to update a session's list with pending events.
|
||||
* Also manages the ready_sessions set and buffer counter.
|
||||
*
|
||||
* KEYS[1] = session key
|
||||
* ARGV[1] = snapshotCount (number of events that were present in our snapshot)
|
||||
* ARGV[2] = pendingCount (number of pending events)
|
||||
* ARGV[3..(2+pendingCount)] = the pending event strings
|
||||
* KEYS[2] = ready sessions key
|
||||
* KEYS[3] = buffer counter key
|
||||
* ARGV[1] = sessionId
|
||||
* ARGV[2] = snapshotCount (number of events that were present in our snapshot)
|
||||
* ARGV[3] = pendingCount (number of pending events)
|
||||
* ARGV[4] = minEventsInSession
|
||||
* ARGV[5..(4+pendingCount)] = the pending event strings
|
||||
*/
|
||||
private readonly updateSessionScript = `
|
||||
local snapshotCount = tonumber(ARGV[1])
|
||||
local pendingCount = tonumber(ARGV[2])
|
||||
local sessionKey = KEYS[1]
|
||||
local readySessionsKey = KEYS[2]
|
||||
local bufferCounterKey = KEYS[3]
|
||||
local sessionId = ARGV[1]
|
||||
local snapshotCount = tonumber(ARGV[2])
|
||||
local pendingCount = tonumber(ARGV[3])
|
||||
local minEventsInSession = tonumber(ARGV[4])
|
||||
|
||||
-- Trim the list to remove the processed (snapshot) events.
|
||||
redis.call("LTRIM", sessionKey, snapshotCount, -1)
|
||||
|
||||
-- Re-insert the pending events at the head in their original order.
|
||||
for i = pendingCount, 1, -1 do
|
||||
redis.call("LPUSH", sessionKey, ARGV[i+2])
|
||||
redis.call("LPUSH", sessionKey, ARGV[i+4])
|
||||
end
|
||||
|
||||
return redis.call("LLEN", sessionKey)
|
||||
local newLength = redis.call("LLEN", sessionKey)
|
||||
|
||||
-- Update ready sessions set based on new length
|
||||
if newLength >= minEventsInSession then
|
||||
redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId)
|
||||
else
|
||||
redis.call("ZREM", readySessionsKey, sessionId)
|
||||
end
|
||||
|
||||
-- Update buffer counter (decrement by processed events, increment by pending)
|
||||
local counterChange = pendingCount - snapshotCount
|
||||
if counterChange ~= 0 then
|
||||
redis.call("INCRBY", bufferCounterKey, counterChange)
|
||||
end
|
||||
|
||||
return newLength
|
||||
`;
|
||||
|
||||
/**
|
||||
* Lua script that processes a batch of session updates in a single call.
|
||||
* Format of updates: [sessionKey1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...]
|
||||
* Optimized batch update script with counter and ready sessions management.
|
||||
* KEYS[1] = ready sessions key
|
||||
* KEYS[2] = buffer counter key
|
||||
* ARGV format: [sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...]
|
||||
*/
|
||||
private readonly batchUpdateSessionsScript = `
|
||||
local i = 1
|
||||
local readySessionsKey = KEYS[1]
|
||||
local bufferCounterKey = KEYS[2]
|
||||
local minEventsInSession = tonumber(ARGV[1])
|
||||
local totalCounterChange = 0
|
||||
|
||||
local i = 2
|
||||
while i <= #ARGV do
|
||||
local sessionKey = ARGV[i]
|
||||
local snapshotCount = tonumber(ARGV[i + 1])
|
||||
local pendingCount = tonumber(ARGV[i + 2])
|
||||
local sessionId = ARGV[i + 1]
|
||||
local snapshotCount = tonumber(ARGV[i + 2])
|
||||
local pendingCount = tonumber(ARGV[i + 3])
|
||||
|
||||
-- Trim the list to remove processed events
|
||||
redis.call("LTRIM", sessionKey, snapshotCount, -1)
|
||||
|
||||
-- Re-insert pending events at the head in original order
|
||||
if pendingCount > 0 then
|
||||
local pendingEvents = {}
|
||||
for j = 1, pendingCount do
|
||||
table.insert(pendingEvents, ARGV[i + 2 + j])
|
||||
-- Reinsert in original order: LPUSH requires reverse iteration
|
||||
for j = pendingCount, 1, -1 do
|
||||
redis.call("LPUSH", sessionKey, ARGV[i + 3 + j])
|
||||
end
|
||||
redis.call("LPUSH", sessionKey, unpack(pendingEvents))
|
||||
end
|
||||
|
||||
i = i + 3 + pendingCount
|
||||
local newLength = redis.call("LLEN", sessionKey)
|
||||
|
||||
-- Update ready sessions set based on new length
|
||||
if newLength >= minEventsInSession then
|
||||
redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId)
|
||||
else
|
||||
redis.call("ZREM", readySessionsKey, sessionId)
|
||||
end
|
||||
|
||||
-- Track counter change
|
||||
totalCounterChange = totalCounterChange + (pendingCount - snapshotCount)
|
||||
|
||||
i = i + 4 + pendingCount
|
||||
end
|
||||
|
||||
-- Update buffer counter once
|
||||
if totalCounterChange ~= 0 then
|
||||
redis.call("INCRBY", bufferCounterKey, totalCounterChange)
|
||||
end
|
||||
|
||||
return "OK"
|
||||
`;
|
||||
|
||||
@@ -194,9 +261,69 @@ return "OK"
|
||||
return multi.exec();
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimized Lua script for adding events with counter management.
|
||||
* KEYS[1] = session key (if session event)
|
||||
* KEYS[2] = regular queue key
|
||||
* KEYS[3] = sessions sorted key
|
||||
* KEYS[4] = ready sessions key
|
||||
* KEYS[5] = buffer counter key
|
||||
* KEYS[6] = last event key (if screen_view)
|
||||
* ARGV[1] = event JSON
|
||||
* ARGV[2] = session_id
|
||||
* ARGV[3] = event_name
|
||||
* ARGV[4] = score (timestamp)
|
||||
* ARGV[5] = minEventsInSession
|
||||
* ARGV[6] = last event TTL (if screen_view)
|
||||
*/
|
||||
private readonly addEventScript = `
|
||||
local sessionKey = KEYS[1]
|
||||
local regularQueueKey = KEYS[2]
|
||||
local sessionsSortedKey = KEYS[3]
|
||||
local readySessionsKey = KEYS[4]
|
||||
local bufferCounterKey = KEYS[5]
|
||||
local lastEventKey = KEYS[6]
|
||||
|
||||
local eventJson = ARGV[1]
|
||||
local sessionId = ARGV[2]
|
||||
local eventName = ARGV[3]
|
||||
local score = tonumber(ARGV[4])
|
||||
local minEventsInSession = tonumber(ARGV[5])
|
||||
local lastEventTTL = tonumber(ARGV[6] or 0)
|
||||
|
||||
local counterIncrement = 1
|
||||
|
||||
if sessionId and sessionId ~= "" and (eventName == "screen_view" or eventName == "session_end") then
|
||||
-- Add to session
|
||||
redis.call("RPUSH", sessionKey, eventJson)
|
||||
redis.call("ZADD", sessionsSortedKey, "NX", score, sessionId)
|
||||
|
||||
-- Check if session is now ready for processing
|
||||
local sessionLength = redis.call("LLEN", sessionKey)
|
||||
if sessionLength >= minEventsInSession or eventName == "session_end" then
|
||||
redis.call("ZADD", readySessionsKey, score, sessionId)
|
||||
end
|
||||
|
||||
-- Handle screen_view specific logic
|
||||
if eventName == "screen_view" and lastEventKey ~= "" then
|
||||
redis.call("SET", lastEventKey, eventJson, "EX", lastEventTTL)
|
||||
elseif eventName == "session_end" and lastEventKey ~= "" then
|
||||
redis.call("DEL", lastEventKey)
|
||||
end
|
||||
else
|
||||
-- Add to regular queue
|
||||
redis.call("RPUSH", regularQueueKey, eventJson)
|
||||
end
|
||||
|
||||
-- Increment buffer counter
|
||||
redis.call("INCR", bufferCounterKey)
|
||||
|
||||
return "OK"
|
||||
`;
|
||||
|
||||
/**
|
||||
* Add an event into Redis.
|
||||
* Combines multiple Redis operations into a single MULTI command.
|
||||
* Uses optimized Lua script to reduce round trips and manage counters.
|
||||
*/
|
||||
async add(event: IClickhouseEvent, _multi?: ReturnType<Redis['multi']>) {
|
||||
try {
|
||||
@@ -204,50 +331,46 @@ return "OK"
|
||||
const eventJson = JSON.stringify(event);
|
||||
const multi = _multi || redis.multi();
|
||||
|
||||
if (event.session_id && this.sessionEvents.includes(event.name)) {
|
||||
const isSessionEvent =
|
||||
event.session_id && this.sessionEvents.includes(event.name);
|
||||
|
||||
if (isSessionEvent) {
|
||||
const sessionKey = this.getSessionKey(event.session_id);
|
||||
const addEventToSession = () => {
|
||||
const score = new Date(event.created_at || Date.now()).getTime();
|
||||
multi
|
||||
.rpush(sessionKey, eventJson)
|
||||
.zadd(this.sessionSortedKey, 'NX', score, event.session_id);
|
||||
};
|
||||
const score = new Date(event.created_at || Date.now()).getTime();
|
||||
const lastEventKey =
|
||||
event.name === 'screen_view'
|
||||
? this.getLastEventKey({
|
||||
projectId: event.project_id,
|
||||
profileId: event.profile_id,
|
||||
})
|
||||
: event.name === 'session_end'
|
||||
? this.getLastEventKey({
|
||||
projectId: event.project_id,
|
||||
profileId: event.profile_id,
|
||||
})
|
||||
: '';
|
||||
|
||||
if (event.name === 'screen_view') {
|
||||
multi.set(
|
||||
this.getLastEventKey({
|
||||
projectId: event.project_id,
|
||||
profileId: event.profile_id,
|
||||
}),
|
||||
eventJson,
|
||||
'EX',
|
||||
60 * 60,
|
||||
);
|
||||
|
||||
addEventToSession();
|
||||
} else if (event.name === 'session_end') {
|
||||
// Delete last screen view
|
||||
multi.del(
|
||||
this.getLastEventKey({
|
||||
projectId: event.project_id,
|
||||
profileId: event.profile_id,
|
||||
}),
|
||||
);
|
||||
|
||||
// Check if session has any events
|
||||
const eventCount = await redis.llen(sessionKey);
|
||||
|
||||
if (eventCount === 0) {
|
||||
// If session is empty, add to regular queue and don't track in sorted set
|
||||
multi.rpush(this.regularQueueKey, eventJson);
|
||||
} else {
|
||||
// Otherwise add to session as normal
|
||||
addEventToSession();
|
||||
}
|
||||
}
|
||||
multi.eval(
|
||||
this.addEventScript,
|
||||
6,
|
||||
sessionKey,
|
||||
this.regularQueueKey,
|
||||
this.sessionSortedKey,
|
||||
this.readySessionsKey,
|
||||
this.bufferCounterKey,
|
||||
lastEventKey,
|
||||
eventJson,
|
||||
event.session_id,
|
||||
event.name,
|
||||
score.toString(),
|
||||
this.minEventsInSession.toString(),
|
||||
'3600', // 1 hour TTL for last event
|
||||
);
|
||||
} else {
|
||||
// All other events go to regularQueue queue
|
||||
multi.rpush(this.regularQueueKey, eventJson);
|
||||
// Non-session events go to regular queue
|
||||
multi
|
||||
.rpush(this.regularQueueKey, eventJson)
|
||||
.incr(this.bufferCounterKey);
|
||||
}
|
||||
|
||||
if (event.profile_id) {
|
||||
@@ -261,43 +384,57 @@ return "OK"
|
||||
if (!_multi) {
|
||||
await multi.exec();
|
||||
}
|
||||
|
||||
await publishEvent('events', 'received', transformEvent(event));
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add event to Redis buffer', { error });
|
||||
}
|
||||
}
|
||||
|
||||
private async getEligableSessions({ minEventsInSession = 2 }) {
|
||||
private async getEligibleSessions(
|
||||
startOffset: number,
|
||||
maxEventsPerSession: number,
|
||||
sessionsPerPage: number,
|
||||
) {
|
||||
const sessionsSorted = await getRedisCache().eval(
|
||||
this.processSessionsScript,
|
||||
this.processReadySessionsScript,
|
||||
2, // number of KEYS
|
||||
this.sessionSortedKey,
|
||||
this.readySessionsKey,
|
||||
this.sessionKeyPrefix,
|
||||
(this.batchSize / 2).toString(),
|
||||
minEventsInSession.toString(),
|
||||
sessionsPerPage.toString(),
|
||||
maxEventsPerSession.toString(),
|
||||
startOffset.toString(),
|
||||
);
|
||||
|
||||
// (A) Process session events using the Lua script.
|
||||
const parsed = getSafeJson<
|
||||
Array<{
|
||||
sessionId: string;
|
||||
events: string[];
|
||||
totalEventCount: number;
|
||||
}>
|
||||
>(sessionsSorted as string);
|
||||
|
||||
const sessions: Record<string, IClickhouseEvent[]> = {};
|
||||
if (!parsed) {
|
||||
return sessions;
|
||||
}
|
||||
const sessions: Record<
|
||||
string,
|
||||
{
|
||||
events: IClickhouseEvent[];
|
||||
totalEventCount: number;
|
||||
}
|
||||
> = {};
|
||||
|
||||
if (!Array.isArray(parsed)) {
|
||||
if (!parsed || !Array.isArray(parsed)) {
|
||||
return sessions;
|
||||
}
|
||||
|
||||
for (const session of parsed) {
|
||||
sessions[session.sessionId] = session.events
|
||||
const events = session.events
|
||||
.map((e) => getSafeJson<IClickhouseEvent>(e))
|
||||
.filter((e): e is IClickhouseEvent => e !== null);
|
||||
|
||||
sessions[session.sessionId] = {
|
||||
events,
|
||||
totalEventCount: session.totalEventCount,
|
||||
};
|
||||
}
|
||||
|
||||
return sessions;
|
||||
@@ -343,28 +480,66 @@ return "OK"
|
||||
|
||||
try {
|
||||
let now = performance.now();
|
||||
const [sessions, regularQueueEvents] = await Promise.all([
|
||||
// (A) Fetch session events
|
||||
this.getEligableSessions({ minEventsInSession: 2 }),
|
||||
// (B) Fetch no-session events
|
||||
redis.lrange(this.regularQueueKey, 0, this.batchSize / 2 - 1),
|
||||
]);
|
||||
// (A) Fetch no-session events once per run
|
||||
const regularQueueEvents = await redis.lrange(
|
||||
this.regularQueueKey,
|
||||
0,
|
||||
Math.floor(this.batchSize / 2) - 1,
|
||||
);
|
||||
|
||||
timer.fetchUnprocessedEvents = performance.now() - now;
|
||||
now = performance.now();
|
||||
|
||||
for (const [sessionId, sessionEvents] of Object.entries(sessions)) {
|
||||
const { flush, pending } = this.processSessionEvents(sessionEvents);
|
||||
|
||||
if (flush.length > 0) {
|
||||
eventsToClickhouse.push(...flush);
|
||||
// (A2) Page through ready sessions within time and budget
|
||||
let sessionBudget = Math.floor(this.batchSize / 2);
|
||||
let startOffset = 0;
|
||||
let totalSessionEventsFetched = 0;
|
||||
while (sessionBudget > 0) {
|
||||
if (performance.now() - now > this.flushTimeBudgetMs) {
|
||||
this.logger.debug('Stopping session paging due to time budget');
|
||||
break;
|
||||
}
|
||||
|
||||
pendingUpdates.push({
|
||||
sessionId,
|
||||
snapshotCount: sessionEvents.length,
|
||||
pending,
|
||||
});
|
||||
const sessionsPerPage = Math.min(
|
||||
this.maxSessionsPerFlush,
|
||||
Math.max(1, Math.floor(sessionBudget / 2)),
|
||||
);
|
||||
const perSessionBudget = Math.max(
|
||||
2,
|
||||
Math.floor(sessionBudget / sessionsPerPage),
|
||||
);
|
||||
|
||||
const sessionsPage = await this.getEligibleSessions(
|
||||
startOffset,
|
||||
perSessionBudget,
|
||||
sessionsPerPage,
|
||||
);
|
||||
const sessionIds = Object.keys(sessionsPage);
|
||||
if (sessionIds.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (const sessionId of sessionIds) {
|
||||
const sessionData = sessionsPage[sessionId]!;
|
||||
const { flush, pending } = this.processSessionEvents(
|
||||
sessionData.events,
|
||||
);
|
||||
|
||||
if (flush.length > 0) {
|
||||
eventsToClickhouse.push(...flush);
|
||||
}
|
||||
|
||||
pendingUpdates.push({
|
||||
sessionId,
|
||||
snapshotCount: sessionData.events.length,
|
||||
pending,
|
||||
});
|
||||
|
||||
// Decrease budget by fetched events for this session window
|
||||
sessionBudget -= sessionData.events.length;
|
||||
totalSessionEventsFetched += sessionData.events.length;
|
||||
if (sessionBudget <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
startOffset += sessionsPerPage;
|
||||
}
|
||||
|
||||
timer.processSessionEvents = performance.now() - now;
|
||||
@@ -420,9 +595,11 @@ return "OK"
|
||||
// (F) Only after successful processing, update Redis
|
||||
const multi = redis.multi();
|
||||
|
||||
// Clean up no-session events
|
||||
// Clean up no-session events and update counter
|
||||
if (regularQueueEvents.length > 0) {
|
||||
multi.ltrim(this.regularQueueKey, regularQueueEvents.length, -1);
|
||||
multi
|
||||
.ltrim(this.regularQueueKey, regularQueueEvents.length, -1)
|
||||
.decrby(this.bufferCounterKey, regularQueueEvents.length);
|
||||
}
|
||||
|
||||
await multi.exec();
|
||||
@@ -436,10 +613,7 @@ return "OK"
|
||||
batchSize: this.batchSize,
|
||||
eventsToClickhouse: eventsToClickhouse.length,
|
||||
pendingSessionUpdates: pendingUpdates.length,
|
||||
sessionEvents: Object.entries(sessions).reduce(
|
||||
(acc, [sId, events]) => acc + events.length,
|
||||
0,
|
||||
),
|
||||
sessionEventsFetched: totalSessionEventsFetched,
|
||||
regularEvents: regularQueueEvents.length,
|
||||
timer,
|
||||
});
|
||||
@@ -609,12 +783,13 @@ return "OK"
|
||||
pendingUpdates,
|
||||
this.updatePendingSessionsBatchSize,
|
||||
)) {
|
||||
const batchArgs: string[] = [];
|
||||
const batchArgs: string[] = [this.minEventsInSession.toString()];
|
||||
|
||||
for (const { sessionId, snapshotCount, pending } of batch) {
|
||||
const sessionKey = this.getSessionKey(sessionId);
|
||||
batchArgs.push(
|
||||
sessionKey,
|
||||
sessionId,
|
||||
snapshotCount.toString(),
|
||||
pending.length.toString(),
|
||||
...pending.map((e) => JSON.stringify(e)),
|
||||
@@ -623,13 +798,16 @@ return "OK"
|
||||
|
||||
await redis.eval(
|
||||
this.batchUpdateSessionsScript,
|
||||
0, // no KEYS needed
|
||||
2, // KEYS: ready sessions, buffer counter
|
||||
this.readySessionsKey,
|
||||
this.bufferCounterKey,
|
||||
...batchArgs,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async getBufferSizeHeavy() {
|
||||
// Fallback method for when counter is not available
|
||||
const redis = getRedisCache();
|
||||
const pipeline = redis.pipeline();
|
||||
|
||||
@@ -668,18 +846,7 @@ return "OK"
|
||||
}
|
||||
|
||||
public async getBufferSize() {
|
||||
const cached = await getRedisCache().get('event_buffer:cached_count');
|
||||
if (cached) {
|
||||
return Number.parseInt(cached, 10);
|
||||
}
|
||||
const count = await this.getBufferSizeHeavy();
|
||||
await getRedisCache().set(
|
||||
'event_buffer:cached_count',
|
||||
count.toString(),
|
||||
'EX',
|
||||
15, // increase when we know it's stable
|
||||
);
|
||||
return count;
|
||||
return this.getBufferSizeWithCounter(() => this.getBufferSizeHeavy());
|
||||
}
|
||||
|
||||
private async incrementActiveVisitorCount(
|
||||
@@ -687,21 +854,13 @@ return "OK"
|
||||
projectId: string,
|
||||
profileId: string,
|
||||
) {
|
||||
// Add/update visitor with current timestamp as score
|
||||
// Track active visitors and emit expiry events when inactive for TTL
|
||||
const now = Date.now();
|
||||
const zsetKey = `live:visitors:${projectId}`;
|
||||
return (
|
||||
multi
|
||||
// To keep the count
|
||||
.zadd(zsetKey, now, profileId)
|
||||
// To trigger the expiration listener
|
||||
.set(
|
||||
`live:visitor:${projectId}:${profileId}`,
|
||||
'1',
|
||||
'EX',
|
||||
this.activeVisitorsExpiration,
|
||||
)
|
||||
);
|
||||
const heartbeatKey = `live:visitor:${projectId}:${profileId}`;
|
||||
return multi
|
||||
.zadd(zsetKey, now, profileId)
|
||||
.set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration);
|
||||
}
|
||||
|
||||
public async getActiveVisitorCount(projectId: string): Promise<number> {
|
||||
@@ -1,6 +1,6 @@
|
||||
import { BotBuffer as BotBufferRedis } from './bot-buffer-redis';
|
||||
import { EventBuffer as EventBufferRedis } from './event-buffer-redis';
|
||||
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis';
|
||||
import { BotBuffer as BotBufferRedis } from './bot-buffer';
|
||||
import { EventBuffer as EventBufferRedis } from './event-buffer';
|
||||
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer';
|
||||
import { SessionBuffer } from './session-buffer';
|
||||
|
||||
export const eventBuffer = new EventBufferRedis();
|
||||
|
||||
@@ -19,7 +19,7 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
|
||||
: 1000;
|
||||
|
||||
private readonly redisBufferKey = 'profile-buffer';
|
||||
private readonly redisKey = 'profile-buffer';
|
||||
private readonly redisProfilePrefix = 'profile-cache:';
|
||||
|
||||
private redis: Redis;
|
||||
@@ -101,8 +101,9 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
const result = await this.redis
|
||||
.multi()
|
||||
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', cacheTtl)
|
||||
.rpush(this.redisBufferKey, JSON.stringify(mergedProfile))
|
||||
.llen(this.redisBufferKey)
|
||||
.rpush(this.redisKey, JSON.stringify(mergedProfile))
|
||||
.incr(this.bufferCounterKey)
|
||||
.llen(this.redisKey)
|
||||
.exec();
|
||||
|
||||
if (!result) {
|
||||
@@ -112,7 +113,7 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
});
|
||||
return;
|
||||
}
|
||||
const bufferLength = (result?.[2]?.[1] as number) ?? 0;
|
||||
const bufferLength = (result?.[3]?.[1] as number) ?? 0;
|
||||
|
||||
this.logger.debug('Current buffer length', {
|
||||
bufferLength,
|
||||
@@ -177,7 +178,7 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
try {
|
||||
this.logger.info('Starting profile buffer processing');
|
||||
const profiles = await this.redis.lrange(
|
||||
this.redisBufferKey,
|
||||
this.redisKey,
|
||||
0,
|
||||
this.batchSize - 1,
|
||||
);
|
||||
@@ -200,8 +201,12 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
});
|
||||
}
|
||||
|
||||
// Only remove profiles after successful insert
|
||||
await this.redis.ltrim(this.redisBufferKey, profiles.length, -1);
|
||||
// Only remove profiles after successful insert and update counter
|
||||
await this.redis
|
||||
.multi()
|
||||
.ltrim(this.redisKey, profiles.length, -1)
|
||||
.decrby(this.bufferCounterKey, profiles.length)
|
||||
.exec();
|
||||
|
||||
this.logger.info('Successfully completed profile processing', {
|
||||
totalProfiles: profiles.length,
|
||||
@@ -212,6 +217,6 @@ export class ProfileBuffer extends BaseBuffer {
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
return getRedisCache().llen(this.redisBufferKey);
|
||||
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { type Redis, getRedisCache, runEvery } from '@openpanel/redis';
|
||||
import { type Redis, getRedisCache } from '@openpanel/redis';
|
||||
|
||||
import { toDots } from '@openpanel/common';
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
@@ -61,7 +61,7 @@ export class SessionBuffer extends BaseBuffer {
|
||||
const duration =
|
||||
new Date(newSession.ended_at).getTime() -
|
||||
new Date(newSession.created_at).getTime();
|
||||
if (duration > 0) {
|
||||
if (duration >= 0) {
|
||||
newSession.duration = duration;
|
||||
} else {
|
||||
this.logger.warn('Session duration is negative', {
|
||||
@@ -174,10 +174,12 @@ export class SessionBuffer extends BaseBuffer {
|
||||
for (const session of sessions) {
|
||||
multi.rpush(this.redisKey, JSON.stringify(session));
|
||||
}
|
||||
// Increment counter by number of sessions added
|
||||
multi.incrby(this.bufferCounterKey, sessions.length);
|
||||
await multi.exec();
|
||||
|
||||
// Check buffer length
|
||||
const bufferLength = await this.redis.llen(this.redisKey);
|
||||
// Check buffer length using counter
|
||||
const bufferLength = await this.getBufferSize();
|
||||
|
||||
if (bufferLength >= this.batchSize) {
|
||||
await this.tryFlush();
|
||||
@@ -216,8 +218,12 @@ export class SessionBuffer extends BaseBuffer {
|
||||
});
|
||||
}
|
||||
|
||||
// Only remove events after successful insert
|
||||
await this.redis.ltrim(this.redisKey, events.length, -1);
|
||||
// Only remove events after successful insert and update counter
|
||||
const multi = this.redis.multi();
|
||||
multi
|
||||
.ltrim(this.redisKey, events.length, -1)
|
||||
.decrby(this.bufferCounterKey, events.length);
|
||||
await multi.exec();
|
||||
|
||||
this.logger.info('Processed sessions', {
|
||||
count: events.length,
|
||||
@@ -228,6 +234,6 @@ export class SessionBuffer extends BaseBuffer {
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
return getRedisCache().llen(this.redisKey);
|
||||
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,10 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@openpanel/db": "workspace:*",
|
||||
"@openpanel/logger": "workspace:*",
|
||||
"@openpanel/redis": "workspace:*",
|
||||
"bullmq": "^5.8.7"
|
||||
"bullmq": "^5.8.7",
|
||||
"groupmq": "1.0.0-next.13"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@openpanel/sdk": "workspace:*",
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import { Queue, QueueEvents } from 'bullmq';
|
||||
|
||||
import type { IServiceEvent, Notification, Prisma } from '@openpanel/db';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
import type { IServiceEvent, Prisma } from '@openpanel/db';
|
||||
import { createLogger } from '@openpanel/logger';
|
||||
import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis';
|
||||
import type { TrackPayload } from '@openpanel/sdk';
|
||||
import { Queue as GroupQueue } from 'groupmq';
|
||||
|
||||
export const queueLogger = createLogger({ name: 'queue' });
|
||||
|
||||
export interface EventsQueuePayloadIncomingEvent {
|
||||
type: 'incomingEvent';
|
||||
@@ -103,6 +107,17 @@ export const eventsQueue = new Queue<EventsQueuePayload>('events', {
|
||||
},
|
||||
});
|
||||
|
||||
export const eventsGroupQueue = new GroupQueue<
|
||||
EventsQueuePayloadIncomingEvent['payload']
|
||||
>({
|
||||
logger: queueLogger,
|
||||
namespace: 'group_events',
|
||||
redis: getRedisGroupQueue(),
|
||||
orderingDelayMs: 2000,
|
||||
keepCompleted: 10,
|
||||
keepFailed: 10_000,
|
||||
});
|
||||
|
||||
export const sessionsQueue = new Queue<SessionsQueuePayload>('sessions', {
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
|
||||
@@ -8,6 +8,8 @@ const options: RedisOptions = {
|
||||
|
||||
export { Redis };
|
||||
|
||||
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
|
||||
|
||||
export interface ExtendedRedis extends Redis {
|
||||
getJson: <T = any>(key: string) => Promise<T | null>;
|
||||
setJson: <T = any>(
|
||||
@@ -63,7 +65,7 @@ const createRedisClient = (
|
||||
let redisCache: ExtendedRedis;
|
||||
export function getRedisCache() {
|
||||
if (!redisCache) {
|
||||
redisCache = createRedisClient(process.env.REDIS_URL!, options);
|
||||
redisCache = createRedisClient(REDIS_URL, options);
|
||||
}
|
||||
|
||||
return redisCache;
|
||||
@@ -72,7 +74,7 @@ export function getRedisCache() {
|
||||
let redisSub: ExtendedRedis;
|
||||
export function getRedisSub() {
|
||||
if (!redisSub) {
|
||||
redisSub = createRedisClient(process.env.REDIS_URL!, options);
|
||||
redisSub = createRedisClient(REDIS_URL, options);
|
||||
}
|
||||
|
||||
return redisSub;
|
||||
@@ -81,7 +83,7 @@ export function getRedisSub() {
|
||||
let redisPub: ExtendedRedis;
|
||||
export function getRedisPub() {
|
||||
if (!redisPub) {
|
||||
redisPub = createRedisClient(process.env.REDIS_URL!, options);
|
||||
redisPub = createRedisClient(REDIS_URL, options);
|
||||
}
|
||||
|
||||
return redisPub;
|
||||
@@ -91,20 +93,32 @@ let redisQueue: ExtendedRedis;
|
||||
export function getRedisQueue() {
|
||||
if (!redisQueue) {
|
||||
// Use different redis for queues (self-hosting will re-use the same redis instance)
|
||||
redisQueue = createRedisClient(
|
||||
(process.env.QUEUE_REDIS_URL || process.env.REDIS_URL)!,
|
||||
{
|
||||
...options,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: null,
|
||||
enableOfflineQueue: true,
|
||||
},
|
||||
);
|
||||
redisQueue = createRedisClient(REDIS_URL, {
|
||||
...options,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: null,
|
||||
enableOfflineQueue: true,
|
||||
});
|
||||
}
|
||||
|
||||
return redisQueue;
|
||||
}
|
||||
|
||||
let redisGroupQueue: ExtendedRedis;
|
||||
export function getRedisGroupQueue() {
|
||||
if (!redisGroupQueue) {
|
||||
// Dedicated Redis connection for GroupWorker to avoid blocking BullMQ
|
||||
redisGroupQueue = createRedisClient(REDIS_URL, {
|
||||
...options,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: null,
|
||||
enableOfflineQueue: true,
|
||||
});
|
||||
}
|
||||
|
||||
return redisGroupQueue;
|
||||
}
|
||||
|
||||
export async function getLock(key: string, value: string, timeout: number) {
|
||||
const lock = await getRedisCache().set(key, value, 'PX', timeout, 'NX');
|
||||
return lock === 'OK';
|
||||
|
||||
@@ -15,6 +15,6 @@ export async function runEvery({
|
||||
return;
|
||||
}
|
||||
|
||||
getRedisCache().set(cacheKey, 'true', 'EX', interval);
|
||||
await getRedisCache().set(cacheKey, '1', 'EX', interval);
|
||||
return fn();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user