diff --git a/apps/worker/src/jobs/import.ts b/apps/worker/src/jobs/import.ts index ae3849bb..f03a053c 100644 --- a/apps/worker/src/jobs/import.ts +++ b/apps/worker/src/jobs/import.ts @@ -209,6 +209,8 @@ export async function importJob(job: Job) { await updateImportStatus(jobLogger, job, importId, { step: 'loading', batch: createdAt, + totalEvents, + processedEvents, }); // Yield control back to event loop after processing final batch diff --git a/packages/db/src/services/import.service.ts b/packages/db/src/services/import.service.ts index 5a956bd6..189d54d5 100644 --- a/packages/db/src/services/import.service.ts +++ b/packages/db/src/services/import.service.ts @@ -259,7 +259,7 @@ export async function createSessionsStartEndEvents( // Parse timestamps as Date objects to calculate duration const firstTime = new Date(session.first_timestamp).getTime(); const lastTime = new Date(session.last_timestamp).getTime(); - const durationMs = lastTime - firstTime; + const durationMs = Math.max(0, lastTime - firstTime); // Ensure non-negative duration // Helper function to adjust timestamp by milliseconds without timezone conversion const adjustTimestamp = (timestamp: string, offsetMs: number): string => { @@ -517,7 +517,7 @@ export async function backfillSessionsToProduction( argMax(e.created_at, e.created_at) as ended_at, if( argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = '', - if(countIf(e.name = 'screen_view') > 1, true, false), + if(countIf(e.name = 'screen_view') > 1, false, true), argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = 'true' ) as is_bounce, argMinIf(e.origin, e.created_at, e.name = 'session_start') as entry_origin, diff --git a/packages/importer/src/providers/mixpanel.ts b/packages/importer/src/providers/mixpanel.ts index 1d29bb33..710f56e0 100644 --- a/packages/importer/src/providers/mixpanel.ts +++ b/packages/importer/src/providers/mixpanel.ts @@ -52,18 +52,54 @@ export class MixpanelProvider extends BaseImportProvider { ): AsyncGenerator { const { serviceAccount, serviceSecret, projectId, from, to } = this.config; - // Split the date range into monthly chunks for reliability + // Split the date range into daily chunks for reliability // Uses base class utility to avoid timeout issues with large date ranges - const dateChunks = this.getDateChunks(overrideFrom ?? from, to); // 1 month per chunk + const dateChunks = this.getDateChunks(overrideFrom ?? from, to); // 1 day per chunk (default) for (const [chunkFrom, chunkTo] of dateChunks) { - yield* this.fetchEventsForDateRange( - serviceAccount, - serviceSecret, - projectId, - chunkFrom, - chunkTo, - ); + let retries = 0; + const maxRetries = 3; + + while (retries <= maxRetries) { + try { + yield* this.fetchEventsForDateRange( + serviceAccount, + serviceSecret, + projectId, + chunkFrom, + chunkTo, + ); + break; // Success, move to next chunk + } catch (error) { + retries++; + const isLastRetry = retries > maxRetries; + + this.logger?.warn('Failed to fetch events for date range', { + chunkFrom, + chunkTo, + attempt: retries, + maxRetries, + error: (error as Error).message, + willRetry: !isLastRetry, + }); + + if (isLastRetry) { + // Final attempt failed, re-throw + throw new Error( + `Failed to fetch Mixpanel events for ${chunkFrom} to ${chunkTo} after ${maxRetries} retries: ${(error as Error).message}`, + ); + } + + // Exponential backoff: wait before retrying + const delay = Math.min(1000 * 2 ** (retries - 1), 60_000); // Cap at 1 minute + this.logger?.info('Retrying after delay', { + delayMs: delay, + chunkFrom, + chunkTo, + }); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } } } @@ -131,7 +167,10 @@ export class MixpanelProvider extends BaseImportProvider { const event = JSON.parse(line); yield event; } catch (error) { - console.warn('Failed to parse Mixpanel event:', line); + this.logger?.warn('Failed to parse Mixpanel event', { + line: line.substring(0, 100), + error, + }); } } } @@ -143,7 +182,13 @@ export class MixpanelProvider extends BaseImportProvider { const event = JSON.parse(buffer); yield event; } catch (error) { - console.warn('Failed to parse final Mixpanel event:', buffer); + this.logger?.warn( + 'Failed to parse Mixpanel event (remaining buffer)', + { + line: buffer.substring(0, 100), + error, + }, + ); } } } finally {