improve(worker): handle sessions with unknown state

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-09 20:39:52 +01:00
parent b9d071dbbb
commit af0f9717a8
3 changed files with 34 additions and 4 deletions

View File

@@ -271,7 +271,7 @@ async function track({
// Prioritize 'screen_view' events by setting no delay // Prioritize 'screen_view' events by setting no delay
// This ensures that session starts are created from 'screen_view' events // This ensures that session starts are created from 'screen_view' events
// rather than other events, maintaining accurate session tracking // rather than other events, maintaining accurate session tracking
delay: payload.name === 'screen_view' ? undefined : 1000, delay: isScreenView ? undefined : 1000,
}, },
); );
} }

View File

@@ -93,7 +93,7 @@ export async function getSessionEndJob(args: {
} | null> { } | null> {
const { priority, retryCount = 0 } = args; const { priority, retryCount = 0 } = args;
if (retryCount > 10) { if (retryCount >= 6) {
throw new Error('Failed to get session end'); throw new Error('Failed to get session end');
} }
@@ -109,8 +109,23 @@ export async function getSessionEndJob(args: {
return { deviceId, job }; return { deviceId, job };
} }
if (state === 'completed' || state === 'failed') { if (state === 'failed') {
await job.retry();
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
if (state === 'completed') {
await job.remove(); await job.remove();
return getSessionEndJob({
...args,
priority,
retryCount,
});
} }
if (state === 'active' || state === 'waiting') { if (state === 'active' || state === 'waiting') {
@@ -122,6 +137,16 @@ export async function getSessionEndJob(args: {
}); });
} }
// Shady state here, just remove it and retry
if (state === 'unknown') {
await job.remove();
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
return null; return null;
} }
@@ -145,7 +170,8 @@ export async function getSessionEndJob(args: {
// If no job found and not priority, retry // If no job found and not priority, retry
if (!priority) { if (!priority) {
await new Promise((resolve) => setTimeout(resolve, 200)); const backoffDelay = 50 * 2 ** retryCount;
await new Promise((resolve) => setTimeout(resolve, backoffDelay));
return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 }); return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 });
} }

View File

@@ -303,6 +303,10 @@ return "OK"
return sessions; return sessions;
} }
if (!Array.isArray(parsed)) {
return sessions;
}
for (const session of parsed) { for (const session of parsed) {
sessions[session.sessionId] = session.events sessions[session.sessionId] = session.events
.map((e) => getSafeJson<IClickhouseEvent>(e)) .map((e) => getSafeJson<IClickhouseEvent>(e))