fix: bump groupmq, fix stalled sessions in buffers, improve heavy event count

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-10-06 21:22:56 +02:00
parent b3e06e985d
commit af580333b4
8 changed files with 434 additions and 90 deletions

View File

@@ -38,7 +38,7 @@
"fastify": "^5.2.1", "fastify": "^5.2.1",
"fastify-metrics": "^12.1.0", "fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0", "fastify-raw-body": "^5.0.0",
"groupmq": "1.0.0-next.14", "groupmq": "1.0.0-next.15",
"ico-to-png": "^0.2.2", "ico-to-png": "^0.2.2",
"jsonwebtoken": "^9.0.2", "jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1", "ramda": "^0.29.1",

View File

@@ -23,7 +23,7 @@
"@openpanel/redis": "workspace:*", "@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7", "bullmq": "^5.8.7",
"express": "^4.18.2", "express": "^4.18.2",
"groupmq": "1.0.0-next.14", "groupmq": "1.0.0-next.15",
"prom-client": "^15.1.3", "prom-client": "^15.1.3",
"ramda": "^0.29.1", "ramda": "^0.29.1",
"source-map-support": "^0.5.21", "source-map-support": "^0.5.21",

View File

@@ -39,7 +39,7 @@ export class BaseBuffer {
const key = this.bufferCounterKey; const key = this.bufferCounterKey;
try { try {
await runEvery({ await runEvery({
interval: 60 * 15, interval: 60 * 60,
key: `${this.name}-buffer:resync`, key: `${this.name}-buffer:resync`,
fn: async () => { fn: async () => {
try { try {

View File

@@ -206,7 +206,7 @@ describe('EventBuffer with real Redis', () => {
expect(await eventBuffer.getBufferSize()).toBe(0); expect(await eventBuffer.getBufferSize()).toBe(0);
}); });
it('adds session to ready set at 2 events and removes after processing', async () => { it('adds session to ready set at 2 events and removes it when < 2 events remain', async () => {
const s = 'session_ready'; const s = 'session_ready';
const e1 = { const e1 = {
project_id: 'p3', project_id: 'p3',
@@ -235,9 +235,18 @@ describe('EventBuffer with real Redis', () => {
.mockResolvedValueOnce(undefined as any); .mockResolvedValueOnce(undefined as any);
await eventBuffer.processBuffer(); await eventBuffer.processBuffer();
// After processing with one pending left, session should be removed from ready set // After processing with one pending left, session should be REMOVED from ready set
// It will be re-added when the next event arrives
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
expect(insertSpy).toHaveBeenCalled(); expect(insertSpy).toHaveBeenCalled();
// But the session and its data should still exist
const sessionKey = `event_buffer:session:${s}`;
const remaining = await redis.lrange(sessionKey, 0, -1);
expect(remaining.length).toBe(1); // One pending event
expect(
await redis.zscore('event_buffer:sessions_sorted', s),
).not.toBeNull(); // Still in sorted set
}); });
it('sets last screen_view key and clears it on session_end', async () => { it('sets last screen_view key and clears it on session_end', async () => {
@@ -427,16 +436,16 @@ describe('EventBuffer with real Redis', () => {
await eb.processBuffer(); await eb.processBuffer();
// Only consider eval calls for batchUpdateSessionsScript (2 keys, second is total_count) // Only consider eval calls for batchUpdateSessionsScript (3 keys now: ready, sorted, counter)
const batchEvalCalls = evalSpy.mock.calls.filter( const batchEvalCalls = evalSpy.mock.calls.filter(
(call) => call[1] === 2 && call[3] === 'event_buffer:total_count', (call) => call[1] === 3 && call[4] === 'event_buffer:total_count',
); );
const expectedCalls = Math.ceil(numSessions / 3); const expectedCalls = Math.ceil(numSessions / 3);
expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls); expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls);
function countSessionsInEvalCall(args: any[]): number { function countSessionsInEvalCall(args: any[]): number {
let idx = 4; // ARGV starts after: script, numKeys, key1, key2 let idx = 5; // ARGV starts after: script, numKeys, key1, key2, key3
let count = 0; let count = 0;
while (idx < args.length) { while (idx < args.length) {
if (idx + 3 >= args.length) break; if (idx + 3 >= args.length) break;
@@ -448,9 +457,10 @@ describe('EventBuffer with real Redis', () => {
} }
for (const call of batchEvalCalls) { for (const call of batchEvalCalls) {
expect(call[1]).toBe(2); expect(call[1]).toBe(3);
expect(call[2]).toBe('event_buffer:ready_sessions'); expect(call[2]).toBe('event_buffer:ready_sessions');
expect(call[3]).toBe('event_buffer:total_count'); expect(call[3]).toBe('event_buffer:sessions_sorted');
expect(call[4]).toBe('event_buffer:total_count');
const sessionsInThisCall = countSessionsInEvalCall(call.slice(0)); const sessionsInThisCall = countSessionsInEvalCall(call.slice(0));
expect(sessionsInThisCall).toBeLessThanOrEqual(3); expect(sessionsInThisCall).toBeLessThanOrEqual(3);
@@ -500,4 +510,260 @@ describe('EventBuffer with real Redis', () => {
insertSpy.mockRestore(); insertSpy.mockRestore();
}); });
it('flushes ALL screen_views when session_end arrives (no pending events)', async () => {
const t0 = Date.now();
const s = 'session_multi_end';
const view1 = {
project_id: 'p10',
profile_id: 'u10',
session_id: s,
name: 'screen_view',
created_at: new Date(t0).toISOString(),
} as any;
const view2 = {
...view1,
created_at: new Date(t0 + 1000).toISOString(),
} as any;
const view3 = {
...view1,
created_at: new Date(t0 + 2000).toISOString(),
} as any;
const end = {
...view1,
name: 'session_end',
created_at: new Date(t0 + 3000).toISOString(),
} as any;
const eb = new EventBuffer();
await eb.add(view1);
await eb.add(view2);
await eb.add(view3);
await eb.add(end);
const insertSpy = vi
.spyOn(ch, 'insert')
.mockResolvedValueOnce(undefined as any);
await eb.processBuffer();
// All 4 events should be flushed (3 screen_views + session_end)
expect(insertSpy).toHaveBeenCalledWith({
format: 'JSONEachRow',
table: 'events',
values: [view1, view2, view3, end],
});
// Session should be completely empty and removed
const sessionKey = `event_buffer:session:${s}`;
const remaining = await redis.lrange(sessionKey, 0, -1);
expect(remaining.length).toBe(0);
// Session should be removed from both sorted sets
expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull();
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
insertSpy.mockRestore();
});
it('re-adds session to ready_sessions when new event arrives after processing', async () => {
const t0 = Date.now();
const s = 'session_continued';
const view1 = {
project_id: 'p11',
profile_id: 'u11',
session_id: s,
name: 'screen_view',
created_at: new Date(t0).toISOString(),
} as any;
const view2 = {
...view1,
created_at: new Date(t0 + 1000).toISOString(),
} as any;
const eb = new EventBuffer();
await eb.add(view1);
await eb.add(view2);
const insertSpy = vi
.spyOn(ch, 'insert')
.mockResolvedValue(undefined as any);
// First processing: flush view1, keep view2 pending
await eb.processBuffer();
expect(insertSpy).toHaveBeenCalledWith({
format: 'JSONEachRow',
table: 'events',
values: [{ ...view1, duration: 1000 }],
});
// Session should be REMOVED from ready_sessions (only 1 event left)
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
// Add a third screen_view - this should re-add to ready_sessions
const view3 = {
...view1,
created_at: new Date(t0 + 2000).toISOString(),
} as any;
await eb.add(view3);
// NOW it should be back in ready_sessions (2 events again)
expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull();
insertSpy.mockClear();
// Second processing: should process view2 (now has duration), keep view3 pending
await eb.processBuffer();
expect(insertSpy).toHaveBeenCalledWith({
format: 'JSONEachRow',
table: 'events',
values: [{ ...view2, duration: 1000 }],
});
// Session should be REMOVED again (only 1 event left)
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
const sessionKey = `event_buffer:session:${s}`;
const remaining = await redis.lrange(sessionKey, 0, -1);
expect(remaining.length).toBe(1);
expect(JSON.parse(remaining[0]!)).toMatchObject({
session_id: s,
created_at: view3.created_at,
});
insertSpy.mockRestore();
});
it('removes session from ready_sessions only when completely empty', async () => {
const t0 = Date.now();
const s = 'session_complete';
const view = {
project_id: 'p12',
profile_id: 'u12',
session_id: s,
name: 'screen_view',
created_at: new Date(t0).toISOString(),
} as any;
const end = {
...view,
name: 'session_end',
created_at: new Date(t0 + 1000).toISOString(),
} as any;
const eb = new EventBuffer();
await eb.add(view);
await eb.add(end);
const insertSpy = vi
.spyOn(ch, 'insert')
.mockResolvedValueOnce(undefined as any);
await eb.processBuffer();
// Both events flushed, session empty
expect(insertSpy).toHaveBeenCalledWith({
format: 'JSONEachRow',
table: 'events',
values: [view, end],
});
// NOW it should be removed from ready_sessions (because it's empty)
expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull();
expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull();
insertSpy.mockRestore();
});
it('getBufferSizeHeavy correctly counts events across many sessions in batches', async () => {
const eb = new EventBuffer();
const numSessions = 250; // More than batch size (100) to test batching
const eventsPerSession = 3;
const numRegularEvents = 50;
// Add session events (3 events per session)
for (let i = 0; i < numSessions; i++) {
const sessionId = `batch_session_${i}`;
for (let j = 0; j < eventsPerSession; j++) {
await eb.add({
project_id: 'p_batch',
profile_id: `u_${i}`,
session_id: sessionId,
name: 'screen_view',
created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(),
} as any);
}
}
// Add regular queue events
for (let i = 0; i < numRegularEvents; i++) {
await eb.add({
project_id: 'p_batch',
name: 'custom_event',
created_at: new Date().toISOString(),
} as any);
}
// Get buffer size using heavy method
const bufferSize = await eb.getBufferSizeHeavy();
// Should count all events: (250 sessions × 3 events) + 50 regular events
const expectedSize = numSessions * eventsPerSession + numRegularEvents;
expect(bufferSize).toBe(expectedSize);
// Verify sessions are properly tracked
const sessionCount = await redis.zcard('event_buffer:sessions_sorted');
expect(sessionCount).toBe(numSessions);
const regularQueueCount = await redis.llen('event_buffer:regular_queue');
expect(regularQueueCount).toBe(numRegularEvents);
});
it('getBufferSizeHeavy handles empty buffer correctly', async () => {
const eb = new EventBuffer();
const bufferSize = await eb.getBufferSizeHeavy();
expect(bufferSize).toBe(0);
});
it('getBufferSizeHeavy handles only regular queue events', async () => {
const eb = new EventBuffer();
const numEvents = 10;
for (let i = 0; i < numEvents; i++) {
await eb.add({
project_id: 'p_regular',
name: 'custom_event',
created_at: new Date().toISOString(),
} as any);
}
const bufferSize = await eb.getBufferSizeHeavy();
expect(bufferSize).toBe(numEvents);
});
it('getBufferSizeHeavy handles only session events', async () => {
const eb = new EventBuffer();
const numSessions = 5;
const eventsPerSession = 2;
for (let i = 0; i < numSessions; i++) {
for (let j = 0; j < eventsPerSession; j++) {
await eb.add({
project_id: 'p_sessions',
profile_id: `u_${i}`,
session_id: `session_${i}`,
name: 'screen_view',
created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(),
} as any);
}
}
const bufferSize = await eb.getBufferSizeHeavy();
expect(bufferSize).toBe(numSessions * eventsPerSession);
});
}); });

View File

@@ -100,6 +100,7 @@ export class EventBuffer extends BaseBuffer {
private readonly processReadySessionsScript = ` private readonly processReadySessionsScript = `
local readySessionsKey = KEYS[1] local readySessionsKey = KEYS[1]
local sessionPrefix = KEYS[2] local sessionPrefix = KEYS[2]
local sessionsSortedKey = KEYS[3]
local maxSessions = tonumber(ARGV[1]) local maxSessions = tonumber(ARGV[1])
local maxEventsPerSession = tonumber(ARGV[2]) local maxEventsPerSession = tonumber(ARGV[2])
local startOffset = tonumber(ARGV[3]) or 0 local startOffset = tonumber(ARGV[3]) or 0
@@ -117,7 +118,7 @@ for i, sessionId in ipairs(sessionIds) do
local eventCount = redis.call('LLEN', sessionKey) local eventCount = redis.call('LLEN', sessionKey)
if eventCount == 0 then if eventCount == 0 then
-- Session is empty, remove from ready set -- Session is empty, remove from both sets
table.insert(sessionsToRemove, sessionId) table.insert(sessionsToRemove, sessionId)
else else
-- Fetch limited number of events to avoid huge payloads -- Fetch limited number of events to avoid huge payloads
@@ -133,9 +134,14 @@ for i, sessionId in ipairs(sessionIds) do
end end
end end
-- Clean up empty sessions from ready set -- Clean up empty sessions from both ready set and sorted set
if #sessionsToRemove > 0 then if #sessionsToRemove > 0 then
redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove)) redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove))
redis.call('ZREM', sessionsSortedKey, unpack(sessionsToRemove))
-- Also delete the empty session keys
for i, sessionId in ipairs(sessionsToRemove) do
redis.call('DEL', sessionPrefix .. sessionId)
end
end end
return cjson.encode(result) return cjson.encode(result)
@@ -147,7 +153,8 @@ return cjson.encode(result)
* *
* KEYS[1] = session key * KEYS[1] = session key
* KEYS[2] = ready sessions key * KEYS[2] = ready sessions key
* KEYS[3] = buffer counter key * KEYS[3] = sessions sorted key
* KEYS[4] = buffer counter key
* ARGV[1] = sessionId * ARGV[1] = sessionId
* ARGV[2] = snapshotCount (number of events that were present in our snapshot) * ARGV[2] = snapshotCount (number of events that were present in our snapshot)
* ARGV[3] = pendingCount (number of pending events) * ARGV[3] = pendingCount (number of pending events)
@@ -157,7 +164,8 @@ return cjson.encode(result)
private readonly updateSessionScript = ` private readonly updateSessionScript = `
local sessionKey = KEYS[1] local sessionKey = KEYS[1]
local readySessionsKey = KEYS[2] local readySessionsKey = KEYS[2]
local bufferCounterKey = KEYS[3] local sessionsSortedKey = KEYS[3]
local bufferCounterKey = KEYS[4]
local sessionId = ARGV[1] local sessionId = ARGV[1]
local snapshotCount = tonumber(ARGV[2]) local snapshotCount = tonumber(ARGV[2])
local pendingCount = tonumber(ARGV[3]) local pendingCount = tonumber(ARGV[3])
@@ -174,9 +182,17 @@ end
local newLength = redis.call("LLEN", sessionKey) local newLength = redis.call("LLEN", sessionKey)
-- Update ready sessions set based on new length -- Update ready sessions set based on new length
if newLength >= minEventsInSession then if newLength == 0 then
redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) -- Session is now empty, remove from both sets and delete key
redis.call("ZREM", readySessionsKey, sessionId)
redis.call("ZREM", sessionsSortedKey, sessionId)
redis.call("DEL", sessionKey)
elseif newLength >= minEventsInSession then
-- Session has enough events, keep/add it in ready_sessions
redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId)
else else
-- Session has events but < minEvents, remove from ready_sessions
-- It will be re-added when a new event arrives (via addEventScript)
redis.call("ZREM", readySessionsKey, sessionId) redis.call("ZREM", readySessionsKey, sessionId)
end end
@@ -192,12 +208,14 @@ return newLength
/** /**
* Optimized batch update script with counter and ready sessions management. * Optimized batch update script with counter and ready sessions management.
* KEYS[1] = ready sessions key * KEYS[1] = ready sessions key
* KEYS[2] = buffer counter key * KEYS[2] = sessions sorted key
* ARGV format: [sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] * KEYS[3] = buffer counter key
* ARGV format: [minEventsInSession, sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...]
*/ */
private readonly batchUpdateSessionsScript = ` private readonly batchUpdateSessionsScript = `
local readySessionsKey = KEYS[1] local readySessionsKey = KEYS[1]
local bufferCounterKey = KEYS[2] local sessionsSortedKey = KEYS[2]
local bufferCounterKey = KEYS[3]
local minEventsInSession = tonumber(ARGV[1]) local minEventsInSession = tonumber(ARGV[1])
local totalCounterChange = 0 local totalCounterChange = 0
@@ -222,9 +240,17 @@ while i <= #ARGV do
local newLength = redis.call("LLEN", sessionKey) local newLength = redis.call("LLEN", sessionKey)
-- Update ready sessions set based on new length -- Update ready sessions set based on new length
if newLength >= minEventsInSession then if newLength == 0 then
redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) -- Session is now empty, remove from both sets and delete key
redis.call("ZREM", readySessionsKey, sessionId)
redis.call("ZREM", sessionsSortedKey, sessionId)
redis.call("DEL", sessionKey)
elseif newLength >= minEventsInSession then
-- Session has enough events, keep/add it in ready_sessions
redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId)
else else
-- Session has events but < minEvents, remove from ready_sessions
-- It will be re-added when a new event arrives (via addEventScript)
redis.call("ZREM", readySessionsKey, sessionId) redis.call("ZREM", readySessionsKey, sessionId)
end end
@@ -398,9 +424,10 @@ return "OK"
) { ) {
const sessionsSorted = await getRedisCache().eval( const sessionsSorted = await getRedisCache().eval(
this.processReadySessionsScript, this.processReadySessionsScript,
2, // number of KEYS 3, // number of KEYS
this.readySessionsKey, this.readySessionsKey,
this.sessionKeyPrefix, this.sessionKeyPrefix,
this.sessionSortedKey,
sessionsPerPage.toString(), sessionsPerPage.toString(),
maxEventsPerSession.toString(), maxEventsPerSession.toString(),
startOffset.toString(), startOffset.toString(),
@@ -646,31 +673,33 @@ return "OK"
const flush: IClickhouseEvent[] = []; const flush: IClickhouseEvent[] = [];
const pending: IClickhouseEvent[] = []; const pending: IClickhouseEvent[] = [];
let hasSessionEnd = false;
// Check if session has ended - if so, flush everything
const hasSessionEnd = events.some((e) => e.name === 'session_end');
if (hasSessionEnd) {
flush.push(...events);
return { flush, pending: [] };
}
const findNextScreenView = (events: IClickhouseEvent[]) => {
return events.find((e) => e.name === 'screen_view');
};
for (let i = 0; i < events.length; i++) { for (let i = 0; i < events.length; i++) {
const event = events[i]!; const event = events[i]!;
if (event.name === 'session_end') {
hasSessionEnd = true;
flush.push(event);
} else {
// For screen_view events, look for next event // For screen_view events, look for next event
const next = events[i + 1]; const next = findNextScreenView(events.slice(i + 1));
if (next) { if (next) {
if (next.name === 'screen_view') {
event.duration = event.duration =
new Date(next.created_at).getTime() - new Date(next.created_at).getTime() -
new Date(event.created_at).getTime(); new Date(event.created_at).getTime();
}
flush.push(event);
} else if (hasSessionEnd) {
flush.push(event); flush.push(event);
} else { } else {
// Last screen_view with no next event - keep pending
pending.push(event); pending.push(event);
} }
} }
}
return { flush, pending }; return { flush, pending };
} }
@@ -696,23 +725,58 @@ return "OK"
const cutoffTime = Date.now() - 1000 * 60 * 60 * 24 * this.daysToKeep; const cutoffTime = Date.now() - 1000 * 60 * 60 * 24 * this.daysToKeep;
try { try {
const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); const sessionCount = await redis.zcard(this.sessionSortedKey);
const batchSize = 1000;
let offset = 0;
let totalCleaned = 0;
for (const sessionId of sessionIds) { this.logger.info('Starting cleanup of stale sessions', {
const score = await redis.zscore(this.sessionSortedKey, sessionId); cutoffTime: new Date(cutoffTime),
totalSessions: sessionCount,
});
if (score) { while (offset < sessionCount) {
const scoreInt = Number.parseInt(score, 10); // Get batch of session IDs with scores
if (scoreInt < cutoffTime) { const sessionIdsWithScores = await redis.zrange(
this.logger.warn('Stale session found', { this.sessionSortedKey,
sessionId, offset,
score, offset + batchSize - 1,
createdAt: new Date(Number.parseInt(score, 10)), 'WITHSCORES',
eventsCount: await redis.llen(this.getSessionKey(sessionId)), );
if (sessionIdsWithScores.length === 0) break;
const pipeline = redis.pipeline();
let staleSessions = 0;
// Process pairs of [sessionId, score]
for (let i = 0; i < sessionIdsWithScores.length; i += 2) {
const sessionId = sessionIdsWithScores[i];
const score = Number.parseInt(sessionIdsWithScores[i + 1] || '0', 10);
if (sessionId && score < cutoffTime) {
staleSessions++;
// Remove from both sorted sets and delete the session key
pipeline.zrem(this.sessionSortedKey, sessionId);
pipeline.zrem(this.readySessionsKey, sessionId);
pipeline.del(this.getSessionKey(sessionId));
}
}
if (staleSessions > 0) {
await pipeline.exec();
totalCleaned += staleSessions;
this.logger.info('Cleaned batch of stale sessions', {
batch: Math.floor(offset / batchSize) + 1,
cleanedInBatch: staleSessions,
totalCleaned,
}); });
} }
offset += batchSize;
} }
}
this.logger.info('Cleanup completed', { totalCleaned });
} catch (error) { } catch (error) {
this.logger.error('Failed to cleanup stale sessions', { error }); this.logger.error('Failed to cleanup stale sessions', { error });
} }
@@ -798,8 +862,9 @@ return "OK"
await redis.eval( await redis.eval(
this.batchUpdateSessionsScript, this.batchUpdateSessionsScript,
2, // KEYS: ready sessions, buffer counter 3, // KEYS: ready sessions, sessions sorted, buffer counter
this.readySessionsKey, this.readySessionsKey,
this.sessionSortedKey,
this.bufferCounterKey, this.bufferCounterKey,
...batchArgs, ...batchArgs,
); );
@@ -809,40 +874,52 @@ return "OK"
public async getBufferSizeHeavy() { public async getBufferSizeHeavy() {
// Fallback method for when counter is not available // Fallback method for when counter is not available
const redis = getRedisCache(); const redis = getRedisCache();
const pipeline = redis.pipeline();
// Queue up commands in the pipeline // Get regular queue count
pipeline.llen(this.regularQueueKey); const regularQueueCount = await redis.llen(this.regularQueueKey);
pipeline.zcard(this.sessionSortedKey);
// Execute pipeline to get initial counts // Get total number of sessions
const [regularQueueCount, sessionCount] = (await pipeline.exec()) as [ const sessionCount = await redis.zcard(this.sessionSortedKey);
any,
any,
];
if (sessionCount[1] === 0) { if (sessionCount === 0) {
return regularQueueCount[1]; return regularQueueCount;
} }
// Get all session IDs and queue up LLEN commands for each session // Process sessions in batches to avoid memory spikes
const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); const batchSize = 1000;
const sessionPipeline = redis.pipeline(); let totalSessionEvents = 0;
let offset = 0;
while (offset < sessionCount) {
// Get batch of session IDs
const sessionIds = await redis.zrange(
this.sessionSortedKey,
offset,
offset + batchSize - 1,
);
if (sessionIds.length === 0) break;
// Queue up LLEN commands for this batch
const sessionPipeline = redis.pipeline();
for (const sessionId of sessionIds) { for (const sessionId of sessionIds) {
sessionPipeline.llen(this.getSessionKey(sessionId)); sessionPipeline.llen(this.getSessionKey(sessionId));
} }
// Execute all LLEN commands in a single pipeline // Execute pipeline for this batch
const sessionCounts = (await sessionPipeline.exec()) as [any, any][]; const sessionCounts = (await sessionPipeline.exec()) as [any, any][];
// Sum up all counts // Sum up counts from this batch
const totalSessionEvents = sessionCounts.reduce((sum, [err, count]) => { for (const [err, count] of sessionCounts) {
if (err) return sum; if (!err) {
return sum + count; totalSessionEvents += count;
}, 0); }
}
return regularQueueCount[1] + totalSessionEvents; offset += batchSize;
}
return regularQueueCount + totalSessionEvents;
} }
public async getBufferSize() { public async getBufferSize() {

View File

@@ -10,7 +10,7 @@
"@openpanel/logger": "workspace:*", "@openpanel/logger": "workspace:*",
"@openpanel/redis": "workspace:*", "@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7", "bullmq": "^5.8.7",
"groupmq": "1.0.0-next.14" "groupmq": "1.0.0-next.15"
}, },
"devDependencies": { "devDependencies": {
"@openpanel/sdk": "workspace:*", "@openpanel/sdk": "workspace:*",

View File

@@ -113,7 +113,8 @@ export const eventsGroupQueue = new GroupQueue<
logger: queueLogger, logger: queueLogger,
namespace: 'group_events', namespace: 'group_events',
redis: getRedisGroupQueue(), redis: getRedisGroupQueue(),
orderingDelayMs: 2000, orderingMethod: 'in-memory',
orderingWindowMs: 50,
keepCompleted: 10, keepCompleted: 10,
keepFailed: 10_000, keepFailed: 10_000,
}); });

18
pnpm-lock.yaml generated
View File

@@ -127,8 +127,8 @@ importers:
specifier: ^5.0.0 specifier: ^5.0.0
version: 5.0.0 version: 5.0.0
groupmq: groupmq:
specifier: 1.0.0-next.14 specifier: 1.0.0-next.15
version: 1.0.0-next.14(ioredis@5.4.1) version: 1.0.0-next.15(ioredis@5.4.1)
ico-to-png: ico-to-png:
specifier: ^0.2.2 specifier: ^0.2.2
version: 0.2.2 version: 0.2.2
@@ -760,8 +760,8 @@ importers:
specifier: ^4.18.2 specifier: ^4.18.2
version: 4.18.2 version: 4.18.2
groupmq: groupmq:
specifier: 1.0.0-next.14 specifier: 1.0.0-next.15
version: 1.0.0-next.14(ioredis@5.4.1) version: 1.0.0-next.15(ioredis@5.4.1)
prom-client: prom-client:
specifier: ^15.1.3 specifier: ^15.1.3
version: 15.1.3 version: 15.1.3
@@ -1224,8 +1224,8 @@ importers:
specifier: ^5.8.7 specifier: ^5.8.7
version: 5.8.7 version: 5.8.7
groupmq: groupmq:
specifier: 1.0.0-next.14 specifier: 1.0.0-next.15
version: 1.0.0-next.14(ioredis@5.4.1) version: 1.0.0-next.15(ioredis@5.4.1)
devDependencies: devDependencies:
'@openpanel/sdk': '@openpanel/sdk':
specifier: workspace:* specifier: workspace:*
@@ -8837,8 +8837,8 @@ packages:
resolution: {integrity: sha512-5v6yZd4JK3eMI3FqqCouswVqwugaA9r4dNZB1wwcmrD02QkV5H0y7XBQW8QwQqEaZY1pM9aqORSORhJRdNK44Q==} resolution: {integrity: sha512-5v6yZd4JK3eMI3FqqCouswVqwugaA9r4dNZB1wwcmrD02QkV5H0y7XBQW8QwQqEaZY1pM9aqORSORhJRdNK44Q==}
engines: {node: '>=6.0'} engines: {node: '>=6.0'}
groupmq@1.0.0-next.14: groupmq@1.0.0-next.15:
resolution: {integrity: sha512-u9fFkQjiCbge6if7UwhY9yeS9KICok0eTMMpjALF55nj/q+J6XuxAlwg1n1CkZPR7SgGKWUAZNuZ3vVwtsO1gg==} resolution: {integrity: sha512-iYOctW9kqhJroupw3tRtgfIsc0Jd4e36E7FsMHmOV35VygIDmrGi/3wZ1TCFIkxJjeaizQ6/GsNojqoBqEzFSg==}
engines: {node: '>=18'} engines: {node: '>=18'}
peerDependencies: peerDependencies:
ioredis: '>=5' ioredis: '>=5'
@@ -22060,7 +22060,7 @@ snapshots:
section-matter: 1.0.0 section-matter: 1.0.0
strip-bom-string: 1.0.0 strip-bom-string: 1.0.0
groupmq@1.0.0-next.14(ioredis@5.4.1): groupmq@1.0.0-next.15(ioredis@5.4.1):
dependencies: dependencies:
cron-parser: 4.9.0 cron-parser: 4.9.0
ioredis: 5.4.1 ioredis: 5.4.1