fix: add retries with backoff to mixpanel api

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-11-05 21:41:50 +01:00
parent f67ec2bb6a
commit a9b9ffa029
3 changed files with 60 additions and 13 deletions

View File

@@ -209,6 +209,8 @@ export async function importJob(job: Job<ImportQueuePayload>) {
await updateImportStatus(jobLogger, job, importId, { await updateImportStatus(jobLogger, job, importId, {
step: 'loading', step: 'loading',
batch: createdAt, batch: createdAt,
totalEvents,
processedEvents,
}); });
// Yield control back to event loop after processing final batch // Yield control back to event loop after processing final batch

View File

@@ -259,7 +259,7 @@ export async function createSessionsStartEndEvents(
// Parse timestamps as Date objects to calculate duration // Parse timestamps as Date objects to calculate duration
const firstTime = new Date(session.first_timestamp).getTime(); const firstTime = new Date(session.first_timestamp).getTime();
const lastTime = new Date(session.last_timestamp).getTime(); const lastTime = new Date(session.last_timestamp).getTime();
const durationMs = lastTime - firstTime; const durationMs = Math.max(0, lastTime - firstTime); // Ensure non-negative duration
// Helper function to adjust timestamp by milliseconds without timezone conversion // Helper function to adjust timestamp by milliseconds without timezone conversion
const adjustTimestamp = (timestamp: string, offsetMs: number): string => { const adjustTimestamp = (timestamp: string, offsetMs: number): string => {
@@ -517,7 +517,7 @@ export async function backfillSessionsToProduction(
argMax(e.created_at, e.created_at) as ended_at, argMax(e.created_at, e.created_at) as ended_at,
if( if(
argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = '', argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = '',
if(countIf(e.name = 'screen_view') > 1, true, false), if(countIf(e.name = 'screen_view') > 1, false, true),
argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = 'true' argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = 'true'
) as is_bounce, ) as is_bounce,
argMinIf(e.origin, e.created_at, e.name = 'session_start') as entry_origin, argMinIf(e.origin, e.created_at, e.name = 'session_start') as entry_origin,

View File

@@ -52,18 +52,54 @@ export class MixpanelProvider extends BaseImportProvider<MixpanelRawEvent> {
): AsyncGenerator<MixpanelRawEvent, void, unknown> { ): AsyncGenerator<MixpanelRawEvent, void, unknown> {
const { serviceAccount, serviceSecret, projectId, from, to } = this.config; const { serviceAccount, serviceSecret, projectId, from, to } = this.config;
// Split the date range into monthly chunks for reliability // Split the date range into daily chunks for reliability
// Uses base class utility to avoid timeout issues with large date ranges // Uses base class utility to avoid timeout issues with large date ranges
const dateChunks = this.getDateChunks(overrideFrom ?? from, to); // 1 month per chunk const dateChunks = this.getDateChunks(overrideFrom ?? from, to); // 1 day per chunk (default)
for (const [chunkFrom, chunkTo] of dateChunks) { for (const [chunkFrom, chunkTo] of dateChunks) {
yield* this.fetchEventsForDateRange( let retries = 0;
serviceAccount, const maxRetries = 3;
serviceSecret,
projectId, while (retries <= maxRetries) {
chunkFrom, try {
chunkTo, yield* this.fetchEventsForDateRange(
); serviceAccount,
serviceSecret,
projectId,
chunkFrom,
chunkTo,
);
break; // Success, move to next chunk
} catch (error) {
retries++;
const isLastRetry = retries > maxRetries;
this.logger?.warn('Failed to fetch events for date range', {
chunkFrom,
chunkTo,
attempt: retries,
maxRetries,
error: (error as Error).message,
willRetry: !isLastRetry,
});
if (isLastRetry) {
// Final attempt failed, re-throw
throw new Error(
`Failed to fetch Mixpanel events for ${chunkFrom} to ${chunkTo} after ${maxRetries} retries: ${(error as Error).message}`,
);
}
// Exponential backoff: wait before retrying
const delay = Math.min(1000 * 2 ** (retries - 1), 60_000); // Cap at 1 minute
this.logger?.info('Retrying after delay', {
delayMs: delay,
chunkFrom,
chunkTo,
});
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
} }
} }
@@ -131,7 +167,10 @@ export class MixpanelProvider extends BaseImportProvider<MixpanelRawEvent> {
const event = JSON.parse(line); const event = JSON.parse(line);
yield event; yield event;
} catch (error) { } catch (error) {
console.warn('Failed to parse Mixpanel event:', line); this.logger?.warn('Failed to parse Mixpanel event', {
line: line.substring(0, 100),
error,
});
} }
} }
} }
@@ -143,7 +182,13 @@ export class MixpanelProvider extends BaseImportProvider<MixpanelRawEvent> {
const event = JSON.parse(buffer); const event = JSON.parse(buffer);
yield event; yield event;
} catch (error) { } catch (error) {
console.warn('Failed to parse final Mixpanel event:', buffer); this.logger?.warn(
'Failed to parse Mixpanel event (remaining buffer)',
{
line: buffer.substring(0, 100),
error,
},
);
} }
} }
} finally { } finally {