fix: redo how the importer works

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-01 21:48:46 +01:00
parent 6251d143d1
commit 647ac2a4af
8 changed files with 993 additions and 984 deletions

View File

@@ -1,17 +1,16 @@
import {
type IClickhouseEvent,
type ImportSteps,
type Prisma,
backfillSessionsToProduction,
cleanupStagingData,
createSessionsStartEndEvents,
db,
formatClickhouseDate,
generateSessionIds,
generateGapBasedSessionIds,
getImportDateBounds,
getImportProgress,
type IClickhouseEvent,
type IClickhouseProfile,
insertImportBatch,
markImportComplete,
insertProfilesBatch,
moveImportsToProduction,
type Prisma,
updateImportStatus,
} from '@openpanel/db';
import { MixpanelProvider, UmamiProvider } from '@openpanel/importer';
@@ -22,294 +21,245 @@ import { logger } from '../utils/logger';
const BATCH_SIZE = Number.parseInt(process.env.IMPORT_BATCH_SIZE || '5000', 10);
/**
* Yields control back to the event loop to prevent stalled jobs
*/
async function yieldToEventLoop(): Promise<void> {
function yieldToEventLoop(): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, 100);
});
}
const PRODUCTION_STEPS = ['moving', 'backfilling_sessions'];
export async function importJob(job: Job<ImportQueuePayload>) {
const { importId } = job.data.payload;
const record = await db.$primary().import.findUniqueOrThrow({
where: { id: importId },
include: {
project: true,
},
include: { project: true },
});
const jobLogger = logger.child({
importId,
config: record.config,
});
type ValidStep = Exclude<ImportSteps, 'failed' | 'completed'>;
const steps: Record<ValidStep, number> = {
loading: 0,
generating_session_ids: 1,
creating_sessions: 2,
moving: 3,
backfilling_sessions: 4,
};
const jobLogger = logger.child({ importId, config: record.config });
jobLogger.info('Starting import job');
const providerInstance = createProvider(record, jobLogger);
const shouldGenerateSessionIds = providerInstance.shouldGenerateSessionIds();
try {
// Check if this is a resume operation
const isNewImport = record.currentStep === null;
const isRetry = record.currentStep !== null;
const hasReachedProduction =
isRetry && PRODUCTION_STEPS.includes(record.currentStep as string);
if (isNewImport) {
await updateImportStatus(jobLogger, job, importId, {
step: 'loading',
});
} else {
jobLogger.info('Resuming import from previous state', {
currentStep: record.currentStep,
currentBatch: record.currentBatch,
});
}
// Try to get a precomputed total for better progress reporting
const totalEvents = await providerInstance
.getTotalEventsCount()
.catch(() => -1);
let processedEvents = record.processedEvents;
const resumeLoadingFrom =
(record.currentStep === 'loading' && record.currentBatch) || undefined;
const resumeGeneratingSessionIdsFrom =
(record.currentStep === 'generating_session_ids' &&
record.currentBatch) ||
undefined;
const resumeCreatingSessionsFrom =
(record.currentStep === 'creating_sessions' && record.currentBatch) ||
undefined;
const resumeMovingFrom =
(record.currentStep === 'moving' && record.currentBatch) || undefined;
const resumeBackfillingSessionsFrom =
(record.currentStep === 'backfilling_sessions' && record.currentBatch) ||
undefined;
// Example:
// shouldRunStep(0) // currStep = 2 (should not run)
// shouldRunStep(1) // currStep = 2 (should not run)
// shouldRunStep(2) // currStep = 2 (should run)
// shouldRunStep(3) // currStep = 2 (should run)
const shouldRunStep = (step: ValidStep) => {
if (isNewImport) {
return true;
// -------------------------------------------------------
// STAGING PHASE: clean slate on failure, run from scratch
// -------------------------------------------------------
if (!hasReachedProduction) {
if (isRetry) {
jobLogger.info(
'Retry detected before production phase — cleaning staging data'
);
await cleanupStagingData(importId);
}
const stepToRunIndex = steps[step];
const currentStepIndex = steps[record.currentStep as ValidStep];
return stepToRunIndex >= currentStepIndex;
};
// Phase 1: Load events into staging
await updateImportStatus(jobLogger, job, importId, { step: 'loading' });
async function whileBounds(
from: string | undefined,
callback: (from: string, to: string) => Promise<void>,
) {
const bounds = await getImportDateBounds(importId, from);
if (bounds.min && bounds.max) {
const start = new Date(bounds.min);
const end = new Date(bounds.max);
let cursor = new Date(start);
while (cursor < end) {
const next = new Date(cursor);
next.setDate(next.getDate() + 1);
await callback(
formatClickhouseDate(cursor, true),
formatClickhouseDate(next, true),
);
cursor = next;
const totalEvents = await providerInstance
.getTotalEventsCount()
.catch(() => -1);
let processedEvents = 0;
const eventBatch: IClickhouseEvent[] = [];
// Yield control back to event loop after processing each day
await yieldToEventLoop();
}
}
}
// Phase 1: Fetch & Transform - Process events in batches
if (shouldRunStep('loading')) {
const eventBatch: any = [];
for await (const rawEvent of providerInstance.parseSource(
resumeLoadingFrom,
)) {
// Validate event
for await (const rawEvent of providerInstance.parseSource()) {
if (
!providerInstance.validate(
// @ts-expect-error
rawEvent,
// @ts-expect-error -- provider-specific raw type
rawEvent
)
) {
jobLogger.warn('Skipping invalid event', { rawEvent });
continue;
}
eventBatch.push(rawEvent);
const transformed: IClickhouseEvent = providerInstance.transformEvent(
// @ts-expect-error -- provider-specific raw type
rawEvent
);
// Session IDs for providers that need them (e.g. Mixpanel) are generated
// in generateGapBasedSessionIds after loading, using gap-based logic.
eventBatch.push(transformed);
// Process batch when it reaches the batch size
if (eventBatch.length >= BATCH_SIZE) {
jobLogger.info('Processing batch', { batchSize: eventBatch.length });
const transformedEvents: IClickhouseEvent[] = eventBatch.map(
(
// @ts-expect-error
event,
) => providerInstance!.transformEvent(event),
);
await insertImportBatch(transformedEvents, importId);
await insertImportBatch(eventBatch, importId);
processedEvents += eventBatch.length;
eventBatch.length = 0;
const createdAt = new Date(transformedEvents[0]?.created_at || '')
const batchDate = new Date(eventBatch[0]?.created_at || '')
.toISOString()
.split('T')[0];
await updateImportStatus(jobLogger, job, importId, {
step: 'loading',
batch: createdAt,
batch: batchDate,
totalEvents,
processedEvents,
});
// Yield control back to event loop after processing each batch
eventBatch.length = 0;
await yieldToEventLoop();
}
}
// Process remaining events in the last batch
if (eventBatch.length > 0) {
const transformedEvents = eventBatch.map(
(
// @ts-expect-error
event,
) => providerInstance!.transformEvent(event),
);
await insertImportBatch(transformedEvents, importId);
await insertImportBatch(eventBatch, importId);
processedEvents += eventBatch.length;
eventBatch.length = 0;
const createdAt = new Date(transformedEvents[0]?.created_at || '')
const batchDate = new Date(eventBatch[0]?.created_at || '')
.toISOString()
.split('T')[0];
await updateImportStatus(jobLogger, job, importId, {
step: 'loading',
batch: createdAt,
batch: batchDate,
totalEvents,
processedEvents,
});
eventBatch.length = 0;
}
// Yield control back to event loop after processing final batch
jobLogger.info('Loading complete', { processedEvents });
// Phase 1b: Load user profiles (Mixpanel only)
const profileBatchSize = 5000;
if (
'streamProfiles' in providerInstance &&
typeof (providerInstance as MixpanelProvider).streamProfiles ===
'function'
) {
await updateImportStatus(jobLogger, job, importId, {
step: 'loading_profiles',
});
const profileBatch: IClickhouseProfile[] = [];
let processedProfiles = 0;
for await (const rawProfile of (
providerInstance as MixpanelProvider
).streamProfiles()) {
const profile = (
providerInstance as MixpanelProvider
).transformProfile(rawProfile);
profileBatch.push(profile);
if (profileBatch.length >= profileBatchSize) {
await insertProfilesBatch(profileBatch, record.projectId);
processedProfiles += profileBatch.length;
await updateImportStatus(jobLogger, job, importId, {
step: 'loading_profiles',
processedProfiles,
});
profileBatch.length = 0;
await yieldToEventLoop();
}
}
if (profileBatch.length > 0) {
await insertProfilesBatch(profileBatch, record.projectId);
processedProfiles += profileBatch.length;
await updateImportStatus(jobLogger, job, importId, {
step: 'loading_profiles',
processedProfiles,
totalProfiles: processedProfiles,
});
}
jobLogger.info('Profile loading complete', { processedProfiles });
}
// Phase 2: Generate gap-based session IDs (Mixpanel etc.)
if (shouldGenerateSessionIds) {
await updateImportStatus(jobLogger, job, importId, {
step: 'generating_sessions',
});
await generateGapBasedSessionIds(importId);
await yieldToEventLoop();
jobLogger.info('Session ID generation complete');
}
// Phase 3: Create session_start / session_end events
await updateImportStatus(jobLogger, job, importId, {
step: 'creating_sessions',
batch: 'all sessions',
});
await createSessionsStartEndEvents(importId);
await yieldToEventLoop();
jobLogger.info('Session event creation complete');
}
// -------------------------------------------------------
// PRODUCTION PHASE: resume-safe, track progress per batch
// -------------------------------------------------------
// Phase 3: Move staging events to production (per-day)
const resumeMovingFrom =
hasReachedProduction && record.currentStep === 'moving'
? (record.currentBatch ?? undefined)
: undefined;
// currentBatch is the last successfully completed day — resume from the next day to avoid re-inserting it
const moveFromDate = (() => {
if (!resumeMovingFrom) return undefined;
const next = new Date(`${resumeMovingFrom}T12:00:00Z`);
next.setUTCDate(next.getUTCDate() + 1);
return next.toISOString().split('T')[0]!;
})();
const bounds = await getImportDateBounds(importId, moveFromDate);
if (bounds.min && bounds.max) {
const startDate = bounds.min.split(' ')[0]!;
const endDate = bounds.max.split(' ')[0]!;
const cursor = new Date(`${startDate}T12:00:00Z`);
const end = new Date(`${endDate}T12:00:00Z`);
while (cursor <= end) {
const dateStr = cursor.toISOString().split('T')[0]!;
await moveImportsToProduction(importId, dateStr);
await updateImportStatus(jobLogger, job, importId, {
step: 'moving',
batch: dateStr,
});
await yieldToEventLoop();
cursor.setUTCDate(cursor.getUTCDate() + 1);
}
}
// Phase 2: Generate session IDs if provider requires it
if (
shouldRunStep('generating_session_ids') &&
providerInstance.shouldGenerateSessionIds()
) {
await whileBounds(resumeGeneratingSessionIdsFrom, async (from) => {
console.log('Generating session IDs', { from });
await generateSessionIds(importId, from);
await updateImportStatus(jobLogger, job, importId, {
step: 'generating_session_ids',
batch: from,
});
jobLogger.info('Move to production complete');
// Yield control back to event loop after processing each day
await yieldToEventLoop();
});
jobLogger.info('Session ID generation complete');
}
// Phase 3-5: Process in daily batches for robustness
if (shouldRunStep('creating_sessions')) {
await whileBounds(resumeCreatingSessionsFrom, async (from) => {
await createSessionsStartEndEvents(importId, from);
await updateImportStatus(jobLogger, job, importId, {
step: 'creating_sessions',
batch: from,
});
// Yield control back to event loop after processing each day
await yieldToEventLoop();
});
}
if (shouldRunStep('moving')) {
await whileBounds(resumeMovingFrom, async (from) => {
await moveImportsToProduction(importId, from);
await updateImportStatus(jobLogger, job, importId, {
step: 'moving',
batch: from,
});
// Yield control back to event loop after processing each day
await yieldToEventLoop();
});
}
if (shouldRunStep('backfilling_sessions')) {
await whileBounds(resumeBackfillingSessionsFrom, async (from) => {
await backfillSessionsToProduction(importId, from);
await updateImportStatus(jobLogger, job, importId, {
step: 'backfilling_sessions',
batch: from,
});
// Yield control back to event loop after processing each day
await yieldToEventLoop();
});
}
await markImportComplete(importId);
// Phase 4: Backfill sessions table
await updateImportStatus(jobLogger, job, importId, {
step: 'completed',
step: 'backfilling_sessions',
batch: 'all sessions',
});
jobLogger.info('Import marked as complete');
await backfillSessionsToProduction(importId);
await yieldToEventLoop();
// Get final progress
const finalProgress = await getImportProgress(importId);
jobLogger.info('Session backfill complete');
jobLogger.info('Import job completed successfully', {
totalEvents: finalProgress.totalEvents,
insertedEvents: finalProgress.insertedEvents,
status: finalProgress.status,
});
// Done
await updateImportStatus(jobLogger, job, importId, { step: 'completed' });
jobLogger.info('Import completed');
return {
success: true,
totalEvents: finalProgress.totalEvents,
processedEvents: finalProgress.insertedEvents,
};
return { success: true };
} catch (error) {
jobLogger.error('Import job failed', { error });
// Mark import as failed
try {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
await updateImportStatus(jobLogger, job, importId, {
step: 'failed',
errorMessage: errorMsg,
});
jobLogger.warn('Import marked as failed', { error: errorMsg });
} catch (markError) {
jobLogger.error('Failed to mark import as failed', { error, markError });
}
@@ -320,7 +270,7 @@ export async function importJob(job: Job<ImportQueuePayload>) {
function createProvider(
record: Prisma.ImportGetPayload<{ include: { project: true } }>,
jobLogger: ILogger,
jobLogger: ILogger
) {
const config = record.config;
switch (config.provider) {