diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index b4edc186..ec651584 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -76,7 +76,7 @@ export class EventBuffer extends BaseBuffer { local sessionSortedKey = KEYS[1] local sessionPrefix = KEYS[2] local batchSize = tonumber(ARGV[1]) -local minEvents = tonumber(ARGV[2]) -- New parameter for minimum events +local minEvents = tonumber(ARGV[2]) local result = {} local sessionsToRemove = {} @@ -200,6 +200,14 @@ return "OK" const multi = _multi || redis.multi(); if (event.session_id && this.sessionEvents.includes(event.name)) { + const sessionKey = this.getSessionKey(event.session_id); + const addEventToSession = () => { + const score = new Date(event.created_at || Date.now()).getTime(); + multi + .rpush(sessionKey, eventJson) + .zadd(this.sessionSortedKey, 'NX', score, event.session_id); + }; + if (event.name === 'screen_view') { multi.set( this.getLastEventKey({ @@ -210,21 +218,28 @@ return "OK" 'EX', 60 * 31, ); + + addEventToSession(); } else if (event.name === 'session_end') { + // Delete last screen view multi.del( this.getLastEventKey({ projectId: event.project_id, profileId: event.profile_id, }), ); + + // Check if session has any events + const eventCount = await redis.llen(sessionKey); + + if (eventCount === 0) { + // If session is empty, add to regular queue and don't track in sorted set + multi.rpush(this.regularQueueKey, eventJson); + } else { + // Otherwise add to session as normal + addEventToSession(); + } } - - const sessionKey = this.getSessionKey(event.session_id); - const score = new Date(event.created_at || Date.now()).getTime(); - - multi - .rpush(sessionKey, eventJson) - .zadd(this.sessionSortedKey, 'NX', score, event.session_id); } else { // All other events go to regularQueue queue multi.rpush(this.regularQueueKey, eventJson); @@ -339,7 +354,7 @@ return "OK" let now = performance.now(); const [sessions, regularQueueEvents] = await Promise.all([ // (A) Fetch session events - this.getEligableSessions({ minEventsInSession: 1 }), + this.getEligableSessions({ minEventsInSession: 2 }), // (B) Fetch no-session events redis.lrange(this.regularQueueKey, 0, this.batchSize / 2 - 1), ]);