From af580333b426c6e9633430ef5172bb02e17cef06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 6 Oct 2025 21:22:56 +0200 Subject: [PATCH] fix: bump groupmq, fix stalled sessions in buffers, improve heavy event count --- apps/api/package.json | 2 +- apps/worker/package.json | 2 +- packages/db/src/buffers/base-buffer.ts | 2 +- packages/db/src/buffers/event-buffer.test.ts | 280 ++++++++++++++++++- packages/db/src/buffers/event-buffer.ts | 215 +++++++++----- packages/queue/package.json | 2 +- packages/queue/src/queues.ts | 3 +- pnpm-lock.yaml | 18 +- 8 files changed, 434 insertions(+), 90 deletions(-) diff --git a/apps/api/package.json b/apps/api/package.json index d5cfe7c0..e4dd7160 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -38,7 +38,7 @@ "fastify": "^5.2.1", "fastify-metrics": "^12.1.0", "fastify-raw-body": "^5.0.0", - "groupmq": "1.0.0-next.14", + "groupmq": "1.0.0-next.15", "ico-to-png": "^0.2.2", "jsonwebtoken": "^9.0.2", "ramda": "^0.29.1", diff --git a/apps/worker/package.json b/apps/worker/package.json index e1360fa8..46a5f787 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -23,7 +23,7 @@ "@openpanel/redis": "workspace:*", "bullmq": "^5.8.7", "express": "^4.18.2", - "groupmq": "1.0.0-next.14", + "groupmq": "1.0.0-next.15", "prom-client": "^15.1.3", "ramda": "^0.29.1", "source-map-support": "^0.5.21", diff --git a/packages/db/src/buffers/base-buffer.ts b/packages/db/src/buffers/base-buffer.ts index 21f66e9c..f87cc903 100644 --- a/packages/db/src/buffers/base-buffer.ts +++ b/packages/db/src/buffers/base-buffer.ts @@ -39,7 +39,7 @@ export class BaseBuffer { const key = this.bufferCounterKey; try { await runEvery({ - interval: 60 * 15, + interval: 60 * 60, key: `${this.name}-buffer:resync`, fn: async () => { try { diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index ddbbe3b1..0df5b4ae 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -206,7 +206,7 @@ describe('EventBuffer with real Redis', () => { expect(await eventBuffer.getBufferSize()).toBe(0); }); - it('adds session to ready set at 2 events and removes after processing', async () => { + it('adds session to ready set at 2 events and removes it when < 2 events remain', async () => { const s = 'session_ready'; const e1 = { project_id: 'p3', @@ -235,9 +235,18 @@ describe('EventBuffer with real Redis', () => { .mockResolvedValueOnce(undefined as any); await eventBuffer.processBuffer(); - // After processing with one pending left, session should be removed from ready set + // After processing with one pending left, session should be REMOVED from ready set + // It will be re-added when the next event arrives expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); expect(insertSpy).toHaveBeenCalled(); + + // But the session and its data should still exist + const sessionKey = `event_buffer:session:${s}`; + const remaining = await redis.lrange(sessionKey, 0, -1); + expect(remaining.length).toBe(1); // One pending event + expect( + await redis.zscore('event_buffer:sessions_sorted', s), + ).not.toBeNull(); // Still in sorted set }); it('sets last screen_view key and clears it on session_end', async () => { @@ -427,16 +436,16 @@ describe('EventBuffer with real Redis', () => { await eb.processBuffer(); - // Only consider eval calls for batchUpdateSessionsScript (2 keys, second is total_count) + // Only consider eval calls for batchUpdateSessionsScript (3 keys now: ready, sorted, counter) const batchEvalCalls = evalSpy.mock.calls.filter( - (call) => call[1] === 2 && call[3] === 'event_buffer:total_count', + (call) => call[1] === 3 && call[4] === 'event_buffer:total_count', ); const expectedCalls = Math.ceil(numSessions / 3); expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls); function countSessionsInEvalCall(args: any[]): number { - let idx = 4; // ARGV starts after: script, numKeys, key1, key2 + let idx = 5; // ARGV starts after: script, numKeys, key1, key2, key3 let count = 0; while (idx < args.length) { if (idx + 3 >= args.length) break; @@ -448,9 +457,10 @@ describe('EventBuffer with real Redis', () => { } for (const call of batchEvalCalls) { - expect(call[1]).toBe(2); + expect(call[1]).toBe(3); expect(call[2]).toBe('event_buffer:ready_sessions'); - expect(call[3]).toBe('event_buffer:total_count'); + expect(call[3]).toBe('event_buffer:sessions_sorted'); + expect(call[4]).toBe('event_buffer:total_count'); const sessionsInThisCall = countSessionsInEvalCall(call.slice(0)); expect(sessionsInThisCall).toBeLessThanOrEqual(3); @@ -500,4 +510,260 @@ describe('EventBuffer with real Redis', () => { insertSpy.mockRestore(); }); + + it('flushes ALL screen_views when session_end arrives (no pending events)', async () => { + const t0 = Date.now(); + const s = 'session_multi_end'; + const view1 = { + project_id: 'p10', + profile_id: 'u10', + session_id: s, + name: 'screen_view', + created_at: new Date(t0).toISOString(), + } as any; + const view2 = { + ...view1, + created_at: new Date(t0 + 1000).toISOString(), + } as any; + const view3 = { + ...view1, + created_at: new Date(t0 + 2000).toISOString(), + } as any; + const end = { + ...view1, + name: 'session_end', + created_at: new Date(t0 + 3000).toISOString(), + } as any; + + const eb = new EventBuffer(); + await eb.add(view1); + await eb.add(view2); + await eb.add(view3); + await eb.add(end); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await eb.processBuffer(); + + // All 4 events should be flushed (3 screen_views + session_end) + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [view1, view2, view3, end], + }); + + // Session should be completely empty and removed + const sessionKey = `event_buffer:session:${s}`; + const remaining = await redis.lrange(sessionKey, 0, -1); + expect(remaining.length).toBe(0); + + // Session should be removed from both sorted sets + expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull(); + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + + insertSpy.mockRestore(); + }); + + it('re-adds session to ready_sessions when new event arrives after processing', async () => { + const t0 = Date.now(); + const s = 'session_continued'; + const view1 = { + project_id: 'p11', + profile_id: 'u11', + session_id: s, + name: 'screen_view', + created_at: new Date(t0).toISOString(), + } as any; + const view2 = { + ...view1, + created_at: new Date(t0 + 1000).toISOString(), + } as any; + + const eb = new EventBuffer(); + await eb.add(view1); + await eb.add(view2); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValue(undefined as any); + + // First processing: flush view1, keep view2 pending + await eb.processBuffer(); + + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [{ ...view1, duration: 1000 }], + }); + + // Session should be REMOVED from ready_sessions (only 1 event left) + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + + // Add a third screen_view - this should re-add to ready_sessions + const view3 = { + ...view1, + created_at: new Date(t0 + 2000).toISOString(), + } as any; + await eb.add(view3); + + // NOW it should be back in ready_sessions (2 events again) + expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull(); + + insertSpy.mockClear(); + + // Second processing: should process view2 (now has duration), keep view3 pending + await eb.processBuffer(); + + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [{ ...view2, duration: 1000 }], + }); + + // Session should be REMOVED again (only 1 event left) + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + + const sessionKey = `event_buffer:session:${s}`; + const remaining = await redis.lrange(sessionKey, 0, -1); + expect(remaining.length).toBe(1); + expect(JSON.parse(remaining[0]!)).toMatchObject({ + session_id: s, + created_at: view3.created_at, + }); + + insertSpy.mockRestore(); + }); + + it('removes session from ready_sessions only when completely empty', async () => { + const t0 = Date.now(); + const s = 'session_complete'; + const view = { + project_id: 'p12', + profile_id: 'u12', + session_id: s, + name: 'screen_view', + created_at: new Date(t0).toISOString(), + } as any; + const end = { + ...view, + name: 'session_end', + created_at: new Date(t0 + 1000).toISOString(), + } as any; + + const eb = new EventBuffer(); + await eb.add(view); + await eb.add(end); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await eb.processBuffer(); + + // Both events flushed, session empty + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [view, end], + }); + + // NOW it should be removed from ready_sessions (because it's empty) + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + expect(await redis.zscore('event_buffer:sessions_sorted', s)).toBeNull(); + + insertSpy.mockRestore(); + }); + + it('getBufferSizeHeavy correctly counts events across many sessions in batches', async () => { + const eb = new EventBuffer(); + const numSessions = 250; // More than batch size (100) to test batching + const eventsPerSession = 3; + const numRegularEvents = 50; + + // Add session events (3 events per session) + for (let i = 0; i < numSessions; i++) { + const sessionId = `batch_session_${i}`; + for (let j = 0; j < eventsPerSession; j++) { + await eb.add({ + project_id: 'p_batch', + profile_id: `u_${i}`, + session_id: sessionId, + name: 'screen_view', + created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(), + } as any); + } + } + + // Add regular queue events + for (let i = 0; i < numRegularEvents; i++) { + await eb.add({ + project_id: 'p_batch', + name: 'custom_event', + created_at: new Date().toISOString(), + } as any); + } + + // Get buffer size using heavy method + const bufferSize = await eb.getBufferSizeHeavy(); + + // Should count all events: (250 sessions × 3 events) + 50 regular events + const expectedSize = numSessions * eventsPerSession + numRegularEvents; + expect(bufferSize).toBe(expectedSize); + + // Verify sessions are properly tracked + const sessionCount = await redis.zcard('event_buffer:sessions_sorted'); + expect(sessionCount).toBe(numSessions); + + const regularQueueCount = await redis.llen('event_buffer:regular_queue'); + expect(regularQueueCount).toBe(numRegularEvents); + }); + + it('getBufferSizeHeavy handles empty buffer correctly', async () => { + const eb = new EventBuffer(); + + const bufferSize = await eb.getBufferSizeHeavy(); + + expect(bufferSize).toBe(0); + }); + + it('getBufferSizeHeavy handles only regular queue events', async () => { + const eb = new EventBuffer(); + const numEvents = 10; + + for (let i = 0; i < numEvents; i++) { + await eb.add({ + project_id: 'p_regular', + name: 'custom_event', + created_at: new Date().toISOString(), + } as any); + } + + const bufferSize = await eb.getBufferSizeHeavy(); + + expect(bufferSize).toBe(numEvents); + }); + + it('getBufferSizeHeavy handles only session events', async () => { + const eb = new EventBuffer(); + const numSessions = 5; + const eventsPerSession = 2; + + for (let i = 0; i < numSessions; i++) { + for (let j = 0; j < eventsPerSession; j++) { + await eb.add({ + project_id: 'p_sessions', + profile_id: `u_${i}`, + session_id: `session_${i}`, + name: 'screen_view', + created_at: new Date(Date.now() + i * 100 + j * 10).toISOString(), + } as any); + } + } + + const bufferSize = await eb.getBufferSizeHeavy(); + + expect(bufferSize).toBe(numSessions * eventsPerSession); + }); }); diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index caf1bb36..e6793ca7 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -100,6 +100,7 @@ export class EventBuffer extends BaseBuffer { private readonly processReadySessionsScript = ` local readySessionsKey = KEYS[1] local sessionPrefix = KEYS[2] +local sessionsSortedKey = KEYS[3] local maxSessions = tonumber(ARGV[1]) local maxEventsPerSession = tonumber(ARGV[2]) local startOffset = tonumber(ARGV[3]) or 0 @@ -117,7 +118,7 @@ for i, sessionId in ipairs(sessionIds) do local eventCount = redis.call('LLEN', sessionKey) if eventCount == 0 then - -- Session is empty, remove from ready set + -- Session is empty, remove from both sets table.insert(sessionsToRemove, sessionId) else -- Fetch limited number of events to avoid huge payloads @@ -133,9 +134,14 @@ for i, sessionId in ipairs(sessionIds) do end end --- Clean up empty sessions from ready set +-- Clean up empty sessions from both ready set and sorted set if #sessionsToRemove > 0 then redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove)) + redis.call('ZREM', sessionsSortedKey, unpack(sessionsToRemove)) + -- Also delete the empty session keys + for i, sessionId in ipairs(sessionsToRemove) do + redis.call('DEL', sessionPrefix .. sessionId) + end end return cjson.encode(result) @@ -147,7 +153,8 @@ return cjson.encode(result) * * KEYS[1] = session key * KEYS[2] = ready sessions key - * KEYS[3] = buffer counter key + * KEYS[3] = sessions sorted key + * KEYS[4] = buffer counter key * ARGV[1] = sessionId * ARGV[2] = snapshotCount (number of events that were present in our snapshot) * ARGV[3] = pendingCount (number of pending events) @@ -157,7 +164,8 @@ return cjson.encode(result) private readonly updateSessionScript = ` local sessionKey = KEYS[1] local readySessionsKey = KEYS[2] -local bufferCounterKey = KEYS[3] +local sessionsSortedKey = KEYS[3] +local bufferCounterKey = KEYS[4] local sessionId = ARGV[1] local snapshotCount = tonumber(ARGV[2]) local pendingCount = tonumber(ARGV[3]) @@ -174,9 +182,17 @@ end local newLength = redis.call("LLEN", sessionKey) -- Update ready sessions set based on new length -if newLength >= minEventsInSession then - redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) +if newLength == 0 then + -- Session is now empty, remove from both sets and delete key + redis.call("ZREM", readySessionsKey, sessionId) + redis.call("ZREM", sessionsSortedKey, sessionId) + redis.call("DEL", sessionKey) +elseif newLength >= minEventsInSession then + -- Session has enough events, keep/add it in ready_sessions + redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId) else + -- Session has events but < minEvents, remove from ready_sessions + -- It will be re-added when a new event arrives (via addEventScript) redis.call("ZREM", readySessionsKey, sessionId) end @@ -192,12 +208,14 @@ return newLength /** * Optimized batch update script with counter and ready sessions management. * KEYS[1] = ready sessions key - * KEYS[2] = buffer counter key - * ARGV format: [sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] + * KEYS[2] = sessions sorted key + * KEYS[3] = buffer counter key + * ARGV format: [minEventsInSession, sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] */ private readonly batchUpdateSessionsScript = ` local readySessionsKey = KEYS[1] -local bufferCounterKey = KEYS[2] +local sessionsSortedKey = KEYS[2] +local bufferCounterKey = KEYS[3] local minEventsInSession = tonumber(ARGV[1]) local totalCounterChange = 0 @@ -222,9 +240,17 @@ while i <= #ARGV do local newLength = redis.call("LLEN", sessionKey) -- Update ready sessions set based on new length - if newLength >= minEventsInSession then - redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) + if newLength == 0 then + -- Session is now empty, remove from both sets and delete key + redis.call("ZREM", readySessionsKey, sessionId) + redis.call("ZREM", sessionsSortedKey, sessionId) + redis.call("DEL", sessionKey) + elseif newLength >= minEventsInSession then + -- Session has enough events, keep/add it in ready_sessions + redis.call("ZADD", readySessionsKey, redis.call("TIME")[1], sessionId) else + -- Session has events but < minEvents, remove from ready_sessions + -- It will be re-added when a new event arrives (via addEventScript) redis.call("ZREM", readySessionsKey, sessionId) end @@ -398,9 +424,10 @@ return "OK" ) { const sessionsSorted = await getRedisCache().eval( this.processReadySessionsScript, - 2, // number of KEYS + 3, // number of KEYS this.readySessionsKey, this.sessionKeyPrefix, + this.sessionSortedKey, sessionsPerPage.toString(), maxEventsPerSession.toString(), startOffset.toString(), @@ -646,29 +673,31 @@ return "OK" const flush: IClickhouseEvent[] = []; const pending: IClickhouseEvent[] = []; - let hasSessionEnd = false; + + // Check if session has ended - if so, flush everything + const hasSessionEnd = events.some((e) => e.name === 'session_end'); + + if (hasSessionEnd) { + flush.push(...events); + return { flush, pending: [] }; + } + + const findNextScreenView = (events: IClickhouseEvent[]) => { + return events.find((e) => e.name === 'screen_view'); + }; for (let i = 0; i < events.length; i++) { const event = events[i]!; - - if (event.name === 'session_end') { - hasSessionEnd = true; + // For screen_view events, look for next event + const next = findNextScreenView(events.slice(i + 1)); + if (next) { + event.duration = + new Date(next.created_at).getTime() - + new Date(event.created_at).getTime(); flush.push(event); } else { - // For screen_view events, look for next event - const next = events[i + 1]; - if (next) { - if (next.name === 'screen_view') { - event.duration = - new Date(next.created_at).getTime() - - new Date(event.created_at).getTime(); - } - flush.push(event); - } else if (hasSessionEnd) { - flush.push(event); - } else { - pending.push(event); - } + // Last screen_view with no next event - keep pending + pending.push(event); } } @@ -696,23 +725,58 @@ return "OK" const cutoffTime = Date.now() - 1000 * 60 * 60 * 24 * this.daysToKeep; try { - const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); + const sessionCount = await redis.zcard(this.sessionSortedKey); + const batchSize = 1000; + let offset = 0; + let totalCleaned = 0; - for (const sessionId of sessionIds) { - const score = await redis.zscore(this.sessionSortedKey, sessionId); + this.logger.info('Starting cleanup of stale sessions', { + cutoffTime: new Date(cutoffTime), + totalSessions: sessionCount, + }); - if (score) { - const scoreInt = Number.parseInt(score, 10); - if (scoreInt < cutoffTime) { - this.logger.warn('Stale session found', { - sessionId, - score, - createdAt: new Date(Number.parseInt(score, 10)), - eventsCount: await redis.llen(this.getSessionKey(sessionId)), - }); + while (offset < sessionCount) { + // Get batch of session IDs with scores + const sessionIdsWithScores = await redis.zrange( + this.sessionSortedKey, + offset, + offset + batchSize - 1, + 'WITHSCORES', + ); + + if (sessionIdsWithScores.length === 0) break; + + const pipeline = redis.pipeline(); + let staleSessions = 0; + + // Process pairs of [sessionId, score] + for (let i = 0; i < sessionIdsWithScores.length; i += 2) { + const sessionId = sessionIdsWithScores[i]; + const score = Number.parseInt(sessionIdsWithScores[i + 1] || '0', 10); + + if (sessionId && score < cutoffTime) { + staleSessions++; + // Remove from both sorted sets and delete the session key + pipeline.zrem(this.sessionSortedKey, sessionId); + pipeline.zrem(this.readySessionsKey, sessionId); + pipeline.del(this.getSessionKey(sessionId)); } } + + if (staleSessions > 0) { + await pipeline.exec(); + totalCleaned += staleSessions; + this.logger.info('Cleaned batch of stale sessions', { + batch: Math.floor(offset / batchSize) + 1, + cleanedInBatch: staleSessions, + totalCleaned, + }); + } + + offset += batchSize; } + + this.logger.info('Cleanup completed', { totalCleaned }); } catch (error) { this.logger.error('Failed to cleanup stale sessions', { error }); } @@ -798,8 +862,9 @@ return "OK" await redis.eval( this.batchUpdateSessionsScript, - 2, // KEYS: ready sessions, buffer counter + 3, // KEYS: ready sessions, sessions sorted, buffer counter this.readySessionsKey, + this.sessionSortedKey, this.bufferCounterKey, ...batchArgs, ); @@ -809,40 +874,52 @@ return "OK" public async getBufferSizeHeavy() { // Fallback method for when counter is not available const redis = getRedisCache(); - const pipeline = redis.pipeline(); - // Queue up commands in the pipeline - pipeline.llen(this.regularQueueKey); - pipeline.zcard(this.sessionSortedKey); + // Get regular queue count + const regularQueueCount = await redis.llen(this.regularQueueKey); - // Execute pipeline to get initial counts - const [regularQueueCount, sessionCount] = (await pipeline.exec()) as [ - any, - any, - ]; + // Get total number of sessions + const sessionCount = await redis.zcard(this.sessionSortedKey); - if (sessionCount[1] === 0) { - return regularQueueCount[1]; + if (sessionCount === 0) { + return regularQueueCount; } - // Get all session IDs and queue up LLEN commands for each session - const sessionIds = await redis.zrange(this.sessionSortedKey, 0, -1); - const sessionPipeline = redis.pipeline(); + // Process sessions in batches to avoid memory spikes + const batchSize = 1000; + let totalSessionEvents = 0; + let offset = 0; - for (const sessionId of sessionIds) { - sessionPipeline.llen(this.getSessionKey(sessionId)); + while (offset < sessionCount) { + // Get batch of session IDs + const sessionIds = await redis.zrange( + this.sessionSortedKey, + offset, + offset + batchSize - 1, + ); + + if (sessionIds.length === 0) break; + + // Queue up LLEN commands for this batch + const sessionPipeline = redis.pipeline(); + for (const sessionId of sessionIds) { + sessionPipeline.llen(this.getSessionKey(sessionId)); + } + + // Execute pipeline for this batch + const sessionCounts = (await sessionPipeline.exec()) as [any, any][]; + + // Sum up counts from this batch + for (const [err, count] of sessionCounts) { + if (!err) { + totalSessionEvents += count; + } + } + + offset += batchSize; } - // Execute all LLEN commands in a single pipeline - const sessionCounts = (await sessionPipeline.exec()) as [any, any][]; - - // Sum up all counts - const totalSessionEvents = sessionCounts.reduce((sum, [err, count]) => { - if (err) return sum; - return sum + count; - }, 0); - - return regularQueueCount[1] + totalSessionEvents; + return regularQueueCount + totalSessionEvents; } public async getBufferSize() { diff --git a/packages/queue/package.json b/packages/queue/package.json index e474498f..9ed5f45a 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -10,7 +10,7 @@ "@openpanel/logger": "workspace:*", "@openpanel/redis": "workspace:*", "bullmq": "^5.8.7", - "groupmq": "1.0.0-next.14" + "groupmq": "1.0.0-next.15" }, "devDependencies": { "@openpanel/sdk": "workspace:*", diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 3d0413f8..1f4cc43f 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -113,7 +113,8 @@ export const eventsGroupQueue = new GroupQueue< logger: queueLogger, namespace: 'group_events', redis: getRedisGroupQueue(), - orderingDelayMs: 2000, + orderingMethod: 'in-memory', + orderingWindowMs: 50, keepCompleted: 10, keepFailed: 10_000, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a23adb23..4b612f35 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -127,8 +127,8 @@ importers: specifier: ^5.0.0 version: 5.0.0 groupmq: - specifier: 1.0.0-next.14 - version: 1.0.0-next.14(ioredis@5.4.1) + specifier: 1.0.0-next.15 + version: 1.0.0-next.15(ioredis@5.4.1) ico-to-png: specifier: ^0.2.2 version: 0.2.2 @@ -760,8 +760,8 @@ importers: specifier: ^4.18.2 version: 4.18.2 groupmq: - specifier: 1.0.0-next.14 - version: 1.0.0-next.14(ioredis@5.4.1) + specifier: 1.0.0-next.15 + version: 1.0.0-next.15(ioredis@5.4.1) prom-client: specifier: ^15.1.3 version: 15.1.3 @@ -1224,8 +1224,8 @@ importers: specifier: ^5.8.7 version: 5.8.7 groupmq: - specifier: 1.0.0-next.14 - version: 1.0.0-next.14(ioredis@5.4.1) + specifier: 1.0.0-next.15 + version: 1.0.0-next.15(ioredis@5.4.1) devDependencies: '@openpanel/sdk': specifier: workspace:* @@ -8837,8 +8837,8 @@ packages: resolution: {integrity: sha512-5v6yZd4JK3eMI3FqqCouswVqwugaA9r4dNZB1wwcmrD02QkV5H0y7XBQW8QwQqEaZY1pM9aqORSORhJRdNK44Q==} engines: {node: '>=6.0'} - groupmq@1.0.0-next.14: - resolution: {integrity: sha512-u9fFkQjiCbge6if7UwhY9yeS9KICok0eTMMpjALF55nj/q+J6XuxAlwg1n1CkZPR7SgGKWUAZNuZ3vVwtsO1gg==} + groupmq@1.0.0-next.15: + resolution: {integrity: sha512-iYOctW9kqhJroupw3tRtgfIsc0Jd4e36E7FsMHmOV35VygIDmrGi/3wZ1TCFIkxJjeaizQ6/GsNojqoBqEzFSg==} engines: {node: '>=18'} peerDependencies: ioredis: '>=5' @@ -22060,7 +22060,7 @@ snapshots: section-matter: 1.0.0 strip-bom-string: 1.0.0 - groupmq@1.0.0-next.14(ioredis@5.4.1): + groupmq@1.0.0-next.15(ioredis@5.4.1): dependencies: cron-parser: 4.9.0 ioredis: 5.4.1