From f2e19093f0e3c94f0057b212db5787b781d67849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 3 Mar 2026 22:17:49 +0100 Subject: [PATCH] fix: importer.. --- apps/worker/src/jobs/import.ts | 33 +++++-- packages/db/src/services/import.service.ts | 107 +++++++++------------ 2 files changed, 70 insertions(+), 70 deletions(-) diff --git a/apps/worker/src/jobs/import.ts b/apps/worker/src/jobs/import.ts index f390a72a..95ada90e 100644 --- a/apps/worker/src/jobs/import.ts +++ b/apps/worker/src/jobs/import.ts @@ -1,5 +1,6 @@ import { backfillSessionsToProduction, + cleanupSessionStartEndEvents, cleanupStagingData, createSessionsStartEndEvents, db, @@ -27,7 +28,7 @@ function yieldToEventLoop(): Promise { }); } -const PRODUCTION_STEPS = ['moving', 'backfilling_sessions']; +const RESUMABLE_STEPS = ['creating_sessions', 'moving', 'backfilling_sessions']; export async function importJob(job: Job) { const { importId } = job.data.payload; @@ -45,16 +46,16 @@ export async function importJob(job: Job) { try { const isRetry = record.currentStep !== null; - const hasReachedProduction = - isRetry && PRODUCTION_STEPS.includes(record.currentStep as string); + const canResume = + isRetry && RESUMABLE_STEPS.includes(record.currentStep as string); // ------------------------------------------------------- // STAGING PHASE: clean slate on failure, run from scratch // ------------------------------------------------------- - if (!hasReachedProduction) { + if (!canResume) { if (isRetry) { jobLogger.info( - 'Retry detected before production phase — cleaning staging data' + 'Retry detected before resumable phase — cleaning staging data' ); await cleanupStagingData(importId); } @@ -183,8 +184,22 @@ export async function importJob(job: Job) { await yieldToEventLoop(); jobLogger.info('Session ID generation complete'); } + } + + // ------------------------------------------------------- + // SESSION CREATION PHASE: resumable by cleaning session_start/end + // ------------------------------------------------------- + const skipSessionCreation = + canResume && record.currentStep !== 'creating_sessions'; + + if (!skipSessionCreation) { + if (canResume && record.currentStep === 'creating_sessions') { + jobLogger.info( + 'Retry at creating_sessions — cleaning existing session_start/end events' + ); + await cleanupSessionStartEndEvents(importId); + } - // Phase 3: Create session_start / session_end events await updateImportStatus(jobLogger, job, importId, { step: 'creating_sessions', batch: 'all sessions', @@ -201,13 +216,15 @@ export async function importJob(job: Job) { // Phase 3: Move staging events to production (per-day) const resumeMovingFrom = - hasReachedProduction && record.currentStep === 'moving' + canResume && record.currentStep === 'moving' ? (record.currentBatch ?? undefined) : undefined; // currentBatch is the last successfully completed day — resume from the next day to avoid re-inserting it const moveFromDate = (() => { - if (!resumeMovingFrom) return undefined; + if (!resumeMovingFrom) { + return undefined; + } const next = new Date(`${resumeMovingFrom}T12:00:00Z`); next.setUTCDate(next.getUTCDate() + 1); return next.toISOString().split('T')[0]!; diff --git a/packages/db/src/services/import.service.ts b/packages/db/src/services/import.service.ts index 19132ab6..3357ebf9 100644 --- a/packages/db/src/services/import.service.ts +++ b/packages/db/src/services/import.service.ts @@ -8,8 +8,8 @@ import { TABLE_NAMES, } from '../clickhouse/client'; import { db, type Prisma } from '../prisma-client'; -import type { IClickhouseProfile } from './profile.service'; import type { IClickhouseEvent } from './event.service'; +import type { IClickhouseProfile } from './profile.service'; export interface ImportStageResult { importId: string; @@ -172,38 +172,6 @@ export async function insertProfilesBatch( return { inserted: normalized.length }; } - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - /** * Delete all staging data for an import. Used to get a clean slate on retry * when the failure happened before moving data to production. @@ -222,6 +190,22 @@ export async function cleanupStagingData(importId: string): Promise { }); } +export async function cleanupSessionStartEndEvents( + importId: string +): Promise { + const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports); + await ch.command({ + query: `ALTER TABLE ${mutationTableName} DELETE WHERE import_id = {importId:String} AND name IN ('session_start', 'session_end')`, + query_params: { importId }, + clickhouse_settings: { + wait_end_of_query: 1, + mutations_sync: '2', + send_progress_in_http_headers: 1, + http_headers_progress_interval_ms: '50000', + }, + }); +} + /** * Reconstruct sessions across ALL dates for the import. * Each session_id gets exactly one session_start and one session_end, @@ -242,27 +226,16 @@ export async function createSessionsStartEndEvents( "name NOT IN ('session_start', 'session_end')", ].join(' AND '); + const sessionBatchSubquery = ` + (SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events_imports} + WHERE ${baseWhere} + AND session_id > {lastSessionId:String} + ORDER BY session_id + LIMIT {limit:UInt32}) + `; + while (true) { - const idsResult = await ch.query({ - query: ` - SELECT DISTINCT session_id - FROM ${TABLE_NAMES.events_imports} - WHERE ${baseWhere} - AND session_id > {lastSessionId:String} - ORDER BY session_id - LIMIT {limit:UInt32} - `, - query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, - format: 'JSONEachRow', - }); - - const idRows = (await idsResult.json()) as Array<{ session_id: string }>; - if (idRows.length === 0) { - break; - } - - const sessionIds = idRows.map((r) => r.session_id); - const sessionEventsQuery = ` SELECT device_id, @@ -279,13 +252,13 @@ export async function createSessionsStartEndEvents( max(created_at) AS last_timestamp FROM ${TABLE_NAMES.events_imports} WHERE ${baseWhere} - AND session_id IN ({sessionIds:Array(String)}) + AND session_id IN ${sessionBatchSubquery} GROUP BY session_id, device_id, project_id `; const sessionEventsResult = await ch.query({ query: sessionEventsQuery, - query_params: { importId, sessionIds }, + query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, format: 'JSONEachRow', }); @@ -438,8 +411,11 @@ export async function createSessionsStartEndEvents( await insertImportBatch(sessionEvents, importId); } - lastSessionId = idRows[idRows.length - 1]!.session_id; - if (idRows.length < SESSION_BATCH_SIZE) { + if (sessionData.length === 0) { + break; + } + lastSessionId = sessionData.at(-1)!.session_id; + if (sessionData.length < SESSION_BATCH_SIZE) { break; } } @@ -500,6 +476,15 @@ export async function backfillSessionsToProduction( const SESSION_BATCH_SIZE = 5000; let lastSessionId = ''; + const baseWhere = 'import_id = {importId:String} AND session_id > {lastSessionId:String}'; + const sessionBatchSubquery = ` + (SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events_imports} + WHERE ${baseWhere} + ORDER BY session_id + LIMIT {limit:UInt32}) + `; + while (true) { const idsResult = await ch.query({ query: ` @@ -519,8 +504,6 @@ export async function backfillSessionsToProduction( break; } - const sessionIds = idRows.map((r) => r.session_id); - const sessionsInsertQuery = ` INSERT INTO ${TABLE_NAMES.sessions} ( id, project_id, profile_id, device_id, created_at, ended_at, @@ -577,13 +560,13 @@ export async function backfillSessionsToProduction( FROM ${TABLE_NAMES.events_imports} e WHERE e.import_id = {importId:String} - AND e.session_id IN ({sessionIds:Array(String)}) + AND e.session_id IN ${sessionBatchSubquery} GROUP BY e.session_id `; await ch.command({ query: sessionsInsertQuery, - query_params: { importId, sessionIds }, + query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, clickhouse_settings: { wait_end_of_query: 1, send_progress_in_http_headers: 1, @@ -591,7 +574,7 @@ export async function backfillSessionsToProduction( }, }); - lastSessionId = idRows[idRows.length - 1]!.session_id; + lastSessionId = idRows.at(-1)!.session_id; if (idRows.length < SESSION_BATCH_SIZE) { break; }