diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index b893f3fd..a8bd4afe 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -271,7 +271,7 @@ async function track({ // Prioritize 'screen_view' events by setting no delay // This ensures that session starts are created from 'screen_view' events // rather than other events, maintaining accurate session tracking - delay: payload.name === 'screen_view' ? undefined : 1000, + delay: isScreenView ? undefined : 1000, }, ); } diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts index 140d0155..36a2b69c 100644 --- a/apps/worker/src/utils/session-handler.ts +++ b/apps/worker/src/utils/session-handler.ts @@ -93,7 +93,7 @@ export async function getSessionEndJob(args: { } | null> { const { priority, retryCount = 0 } = args; - if (retryCount > 10) { + if (retryCount >= 6) { throw new Error('Failed to get session end'); } @@ -109,8 +109,23 @@ export async function getSessionEndJob(args: { return { deviceId, job }; } - if (state === 'completed' || state === 'failed') { + if (state === 'failed') { + await job.retry(); + await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); + return getSessionEndJob({ + ...args, + priority, + retryCount, + }); + } + + if (state === 'completed') { await job.remove(); + return getSessionEndJob({ + ...args, + priority, + retryCount, + }); } if (state === 'active' || state === 'waiting') { @@ -122,6 +137,16 @@ export async function getSessionEndJob(args: { }); } + // Shady state here, just remove it and retry + if (state === 'unknown') { + await job.remove(); + return getSessionEndJob({ + ...args, + priority, + retryCount, + }); + } + return null; } @@ -145,7 +170,8 @@ export async function getSessionEndJob(args: { // If no job found and not priority, retry if (!priority) { - await new Promise((resolve) => setTimeout(resolve, 200)); + const backoffDelay = 50 * 2 ** retryCount; + await new Promise((resolve) => setTimeout(resolve, backoffDelay)); return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 }); } diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index ec651584..8478f4bf 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -303,6 +303,10 @@ return "OK" return sessions; } + if (!Array.isArray(parsed)) { + return sessions; + } + for (const session of parsed) { sessions[session.sessionId] = session.events .map((e) => getSafeJson(e))