From 00f6cd6f50ab172d34e4dde6cc90eeb3751003ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 3 Mar 2026 23:54:53 +0100 Subject: [PATCH] fix: importer 2.. --- packages/db/src/services/import.service.ts | 59 ++++++++++++---------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/packages/db/src/services/import.service.ts b/packages/db/src/services/import.service.ts index 3357ebf9..ec146809 100644 --- a/packages/db/src/services/import.service.ts +++ b/packages/db/src/services/import.service.ts @@ -226,16 +226,27 @@ export async function createSessionsStartEndEvents( "name NOT IN ('session_start', 'session_end')", ].join(' AND '); - const sessionBatchSubquery = ` - (SELECT DISTINCT session_id - FROM ${TABLE_NAMES.events_imports} - WHERE ${baseWhere} - AND session_id > {lastSessionId:String} - ORDER BY session_id - LIMIT {limit:UInt32}) - `; - while (true) { + const idsResult = await ch.query({ + query: ` + SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events_imports} + WHERE ${baseWhere} + AND session_id > {lastSessionId:String} + ORDER BY session_id + LIMIT {limit:UInt32} + `, + query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, + format: 'JSONEachRow', + }); + + const idRows = (await idsResult.json()) as Array<{ session_id: string }>; + if (idRows.length === 0) { + break; + } + + const maxSessionId = idRows.at(-1)!.session_id; + const sessionEventsQuery = ` SELECT device_id, @@ -252,13 +263,14 @@ export async function createSessionsStartEndEvents( max(created_at) AS last_timestamp FROM ${TABLE_NAMES.events_imports} WHERE ${baseWhere} - AND session_id IN ${sessionBatchSubquery} + AND session_id > {lastSessionId:String} + AND session_id <= {maxSessionId:String} GROUP BY session_id, device_id, project_id `; const sessionEventsResult = await ch.query({ query: sessionEventsQuery, - query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, + query_params: { importId, lastSessionId, maxSessionId }, format: 'JSONEachRow', }); @@ -411,11 +423,8 @@ export async function createSessionsStartEndEvents( await insertImportBatch(sessionEvents, importId); } - if (sessionData.length === 0) { - break; - } - lastSessionId = sessionData.at(-1)!.session_id; - if (sessionData.length < SESSION_BATCH_SIZE) { + lastSessionId = maxSessionId; + if (idRows.length < SESSION_BATCH_SIZE) { break; } } @@ -476,15 +485,6 @@ export async function backfillSessionsToProduction( const SESSION_BATCH_SIZE = 5000; let lastSessionId = ''; - const baseWhere = 'import_id = {importId:String} AND session_id > {lastSessionId:String}'; - const sessionBatchSubquery = ` - (SELECT DISTINCT session_id - FROM ${TABLE_NAMES.events_imports} - WHERE ${baseWhere} - ORDER BY session_id - LIMIT {limit:UInt32}) - `; - while (true) { const idsResult = await ch.query({ query: ` @@ -504,6 +504,8 @@ export async function backfillSessionsToProduction( break; } + const maxSessionId = idRows.at(-1)!.session_id; + const sessionsInsertQuery = ` INSERT INTO ${TABLE_NAMES.sessions} ( id, project_id, profile_id, device_id, created_at, ended_at, @@ -560,13 +562,14 @@ export async function backfillSessionsToProduction( FROM ${TABLE_NAMES.events_imports} e WHERE e.import_id = {importId:String} - AND e.session_id IN ${sessionBatchSubquery} + AND e.session_id > {lastSessionId:String} + AND e.session_id <= {maxSessionId:String} GROUP BY e.session_id `; await ch.command({ query: sessionsInsertQuery, - query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, + query_params: { importId, lastSessionId, maxSessionId }, clickhouse_settings: { wait_end_of_query: 1, send_progress_in_http_headers: 1, @@ -574,7 +577,7 @@ export async function backfillSessionsToProduction( }, }); - lastSessionId = idRows.at(-1)!.session_id; + lastSessionId = maxSessionId; if (idRows.length < SESSION_BATCH_SIZE) { break; }