diff --git a/apps/worker/src/jobs/import.ts b/apps/worker/src/jobs/import.ts index f03a053c..f390a72a 100644 --- a/apps/worker/src/jobs/import.ts +++ b/apps/worker/src/jobs/import.ts @@ -1,17 +1,16 @@ import { - type IClickhouseEvent, - type ImportSteps, - type Prisma, backfillSessionsToProduction, + cleanupStagingData, createSessionsStartEndEvents, db, - formatClickhouseDate, - generateSessionIds, + generateGapBasedSessionIds, getImportDateBounds, - getImportProgress, + type IClickhouseEvent, + type IClickhouseProfile, insertImportBatch, - markImportComplete, + insertProfilesBatch, moveImportsToProduction, + type Prisma, updateImportStatus, } from '@openpanel/db'; import { MixpanelProvider, UmamiProvider } from '@openpanel/importer'; @@ -22,294 +21,245 @@ import { logger } from '../utils/logger'; const BATCH_SIZE = Number.parseInt(process.env.IMPORT_BATCH_SIZE || '5000', 10); -/** - * Yields control back to the event loop to prevent stalled jobs - */ -async function yieldToEventLoop(): Promise { +function yieldToEventLoop(): Promise { return new Promise((resolve) => { setTimeout(resolve, 100); }); } +const PRODUCTION_STEPS = ['moving', 'backfilling_sessions']; + export async function importJob(job: Job) { const { importId } = job.data.payload; const record = await db.$primary().import.findUniqueOrThrow({ where: { id: importId }, - include: { - project: true, - }, + include: { project: true }, }); - const jobLogger = logger.child({ - importId, - config: record.config, - }); - - type ValidStep = Exclude; - const steps: Record = { - loading: 0, - generating_session_ids: 1, - creating_sessions: 2, - moving: 3, - backfilling_sessions: 4, - }; - + const jobLogger = logger.child({ importId, config: record.config }); jobLogger.info('Starting import job'); + const providerInstance = createProvider(record, jobLogger); + const shouldGenerateSessionIds = providerInstance.shouldGenerateSessionIds(); try { - // Check if this is a resume operation - const isNewImport = record.currentStep === null; + const isRetry = record.currentStep !== null; + const hasReachedProduction = + isRetry && PRODUCTION_STEPS.includes(record.currentStep as string); - if (isNewImport) { - await updateImportStatus(jobLogger, job, importId, { - step: 'loading', - }); - } else { - jobLogger.info('Resuming import from previous state', { - currentStep: record.currentStep, - currentBatch: record.currentBatch, - }); - } - - // Try to get a precomputed total for better progress reporting - const totalEvents = await providerInstance - .getTotalEventsCount() - .catch(() => -1); - let processedEvents = record.processedEvents; - - const resumeLoadingFrom = - (record.currentStep === 'loading' && record.currentBatch) || undefined; - - const resumeGeneratingSessionIdsFrom = - (record.currentStep === 'generating_session_ids' && - record.currentBatch) || - undefined; - - const resumeCreatingSessionsFrom = - (record.currentStep === 'creating_sessions' && record.currentBatch) || - undefined; - - const resumeMovingFrom = - (record.currentStep === 'moving' && record.currentBatch) || undefined; - - const resumeBackfillingSessionsFrom = - (record.currentStep === 'backfilling_sessions' && record.currentBatch) || - undefined; - - // Example: - // shouldRunStep(0) // currStep = 2 (should not run) - // shouldRunStep(1) // currStep = 2 (should not run) - // shouldRunStep(2) // currStep = 2 (should run) - // shouldRunStep(3) // currStep = 2 (should run) - const shouldRunStep = (step: ValidStep) => { - if (isNewImport) { - return true; + // ------------------------------------------------------- + // STAGING PHASE: clean slate on failure, run from scratch + // ------------------------------------------------------- + if (!hasReachedProduction) { + if (isRetry) { + jobLogger.info( + 'Retry detected before production phase — cleaning staging data' + ); + await cleanupStagingData(importId); } - const stepToRunIndex = steps[step]; - const currentStepIndex = steps[record.currentStep as ValidStep]; - return stepToRunIndex >= currentStepIndex; - }; + // Phase 1: Load events into staging + await updateImportStatus(jobLogger, job, importId, { step: 'loading' }); - async function whileBounds( - from: string | undefined, - callback: (from: string, to: string) => Promise, - ) { - const bounds = await getImportDateBounds(importId, from); - if (bounds.min && bounds.max) { - const start = new Date(bounds.min); - const end = new Date(bounds.max); - let cursor = new Date(start); - while (cursor < end) { - const next = new Date(cursor); - next.setDate(next.getDate() + 1); - await callback( - formatClickhouseDate(cursor, true), - formatClickhouseDate(next, true), - ); - cursor = next; + const totalEvents = await providerInstance + .getTotalEventsCount() + .catch(() => -1); + let processedEvents = 0; + const eventBatch: IClickhouseEvent[] = []; - // Yield control back to event loop after processing each day - await yieldToEventLoop(); - } - } - } - - // Phase 1: Fetch & Transform - Process events in batches - if (shouldRunStep('loading')) { - const eventBatch: any = []; - for await (const rawEvent of providerInstance.parseSource( - resumeLoadingFrom, - )) { - // Validate event + for await (const rawEvent of providerInstance.parseSource()) { if ( !providerInstance.validate( - // @ts-expect-error - rawEvent, + // @ts-expect-error -- provider-specific raw type + rawEvent ) ) { jobLogger.warn('Skipping invalid event', { rawEvent }); continue; } - eventBatch.push(rawEvent); + const transformed: IClickhouseEvent = providerInstance.transformEvent( + // @ts-expect-error -- provider-specific raw type + rawEvent + ); + + // Session IDs for providers that need them (e.g. Mixpanel) are generated + // in generateGapBasedSessionIds after loading, using gap-based logic. + eventBatch.push(transformed); - // Process batch when it reaches the batch size if (eventBatch.length >= BATCH_SIZE) { - jobLogger.info('Processing batch', { batchSize: eventBatch.length }); - - const transformedEvents: IClickhouseEvent[] = eventBatch.map( - ( - // @ts-expect-error - event, - ) => providerInstance!.transformEvent(event), - ); - - await insertImportBatch(transformedEvents, importId); - + await insertImportBatch(eventBatch, importId); processedEvents += eventBatch.length; - eventBatch.length = 0; - const createdAt = new Date(transformedEvents[0]?.created_at || '') + const batchDate = new Date(eventBatch[0]?.created_at || '') .toISOString() .split('T')[0]; await updateImportStatus(jobLogger, job, importId, { step: 'loading', - batch: createdAt, + batch: batchDate, totalEvents, processedEvents, }); - // Yield control back to event loop after processing each batch + eventBatch.length = 0; await yieldToEventLoop(); } } - // Process remaining events in the last batch if (eventBatch.length > 0) { - const transformedEvents = eventBatch.map( - ( - // @ts-expect-error - event, - ) => providerInstance!.transformEvent(event), - ); - - await insertImportBatch(transformedEvents, importId); - + await insertImportBatch(eventBatch, importId); processedEvents += eventBatch.length; - eventBatch.length = 0; - const createdAt = new Date(transformedEvents[0]?.created_at || '') + const batchDate = new Date(eventBatch[0]?.created_at || '') .toISOString() .split('T')[0]; await updateImportStatus(jobLogger, job, importId, { step: 'loading', - batch: createdAt, + batch: batchDate, totalEvents, processedEvents, }); + eventBatch.length = 0; + } - // Yield control back to event loop after processing final batch + jobLogger.info('Loading complete', { processedEvents }); + + // Phase 1b: Load user profiles (Mixpanel only) + const profileBatchSize = 5000; + if ( + 'streamProfiles' in providerInstance && + typeof (providerInstance as MixpanelProvider).streamProfiles === + 'function' + ) { + await updateImportStatus(jobLogger, job, importId, { + step: 'loading_profiles', + }); + + const profileBatch: IClickhouseProfile[] = []; + let processedProfiles = 0; + + for await (const rawProfile of ( + providerInstance as MixpanelProvider + ).streamProfiles()) { + const profile = ( + providerInstance as MixpanelProvider + ).transformProfile(rawProfile); + profileBatch.push(profile); + + if (profileBatch.length >= profileBatchSize) { + await insertProfilesBatch(profileBatch, record.projectId); + processedProfiles += profileBatch.length; + await updateImportStatus(jobLogger, job, importId, { + step: 'loading_profiles', + processedProfiles, + }); + profileBatch.length = 0; + await yieldToEventLoop(); + } + } + + if (profileBatch.length > 0) { + await insertProfilesBatch(profileBatch, record.projectId); + processedProfiles += profileBatch.length; + await updateImportStatus(jobLogger, job, importId, { + step: 'loading_profiles', + processedProfiles, + totalProfiles: processedProfiles, + }); + } + + jobLogger.info('Profile loading complete', { processedProfiles }); + } + + // Phase 2: Generate gap-based session IDs (Mixpanel etc.) + if (shouldGenerateSessionIds) { + await updateImportStatus(jobLogger, job, importId, { + step: 'generating_sessions', + }); + await generateGapBasedSessionIds(importId); await yieldToEventLoop(); + jobLogger.info('Session ID generation complete'); + } + + // Phase 3: Create session_start / session_end events + await updateImportStatus(jobLogger, job, importId, { + step: 'creating_sessions', + batch: 'all sessions', + }); + await createSessionsStartEndEvents(importId); + await yieldToEventLoop(); + + jobLogger.info('Session event creation complete'); + } + + // ------------------------------------------------------- + // PRODUCTION PHASE: resume-safe, track progress per batch + // ------------------------------------------------------- + + // Phase 3: Move staging events to production (per-day) + const resumeMovingFrom = + hasReachedProduction && record.currentStep === 'moving' + ? (record.currentBatch ?? undefined) + : undefined; + + // currentBatch is the last successfully completed day — resume from the next day to avoid re-inserting it + const moveFromDate = (() => { + if (!resumeMovingFrom) return undefined; + const next = new Date(`${resumeMovingFrom}T12:00:00Z`); + next.setUTCDate(next.getUTCDate() + 1); + return next.toISOString().split('T')[0]!; + })(); + + const bounds = await getImportDateBounds(importId, moveFromDate); + if (bounds.min && bounds.max) { + const startDate = bounds.min.split(' ')[0]!; + const endDate = bounds.max.split(' ')[0]!; + const cursor = new Date(`${startDate}T12:00:00Z`); + const end = new Date(`${endDate}T12:00:00Z`); + + while (cursor <= end) { + const dateStr = cursor.toISOString().split('T')[0]!; + + await moveImportsToProduction(importId, dateStr); + await updateImportStatus(jobLogger, job, importId, { + step: 'moving', + batch: dateStr, + }); + + await yieldToEventLoop(); + cursor.setUTCDate(cursor.getUTCDate() + 1); } } - // Phase 2: Generate session IDs if provider requires it - if ( - shouldRunStep('generating_session_ids') && - providerInstance.shouldGenerateSessionIds() - ) { - await whileBounds(resumeGeneratingSessionIdsFrom, async (from) => { - console.log('Generating session IDs', { from }); - await generateSessionIds(importId, from); - await updateImportStatus(jobLogger, job, importId, { - step: 'generating_session_ids', - batch: from, - }); + jobLogger.info('Move to production complete'); - // Yield control back to event loop after processing each day - await yieldToEventLoop(); - }); - - jobLogger.info('Session ID generation complete'); - } - - // Phase 3-5: Process in daily batches for robustness - - if (shouldRunStep('creating_sessions')) { - await whileBounds(resumeCreatingSessionsFrom, async (from) => { - await createSessionsStartEndEvents(importId, from); - await updateImportStatus(jobLogger, job, importId, { - step: 'creating_sessions', - batch: from, - }); - - // Yield control back to event loop after processing each day - await yieldToEventLoop(); - }); - } - - if (shouldRunStep('moving')) { - await whileBounds(resumeMovingFrom, async (from) => { - await moveImportsToProduction(importId, from); - await updateImportStatus(jobLogger, job, importId, { - step: 'moving', - batch: from, - }); - - // Yield control back to event loop after processing each day - await yieldToEventLoop(); - }); - } - - if (shouldRunStep('backfilling_sessions')) { - await whileBounds(resumeBackfillingSessionsFrom, async (from) => { - await backfillSessionsToProduction(importId, from); - await updateImportStatus(jobLogger, job, importId, { - step: 'backfilling_sessions', - batch: from, - }); - - // Yield control back to event loop after processing each day - await yieldToEventLoop(); - }); - } - - await markImportComplete(importId); + // Phase 4: Backfill sessions table await updateImportStatus(jobLogger, job, importId, { - step: 'completed', + step: 'backfilling_sessions', + batch: 'all sessions', }); - jobLogger.info('Import marked as complete'); + await backfillSessionsToProduction(importId); + await yieldToEventLoop(); - // Get final progress - const finalProgress = await getImportProgress(importId); + jobLogger.info('Session backfill complete'); - jobLogger.info('Import job completed successfully', { - totalEvents: finalProgress.totalEvents, - insertedEvents: finalProgress.insertedEvents, - status: finalProgress.status, - }); + // Done + await updateImportStatus(jobLogger, job, importId, { step: 'completed' }); + jobLogger.info('Import completed'); - return { - success: true, - totalEvents: finalProgress.totalEvents, - processedEvents: finalProgress.insertedEvents, - }; + return { success: true }; } catch (error) { jobLogger.error('Import job failed', { error }); - // Mark import as failed try { const errorMsg = error instanceof Error ? error.message : 'Unknown error'; await updateImportStatus(jobLogger, job, importId, { step: 'failed', errorMessage: errorMsg, }); - jobLogger.warn('Import marked as failed', { error: errorMsg }); } catch (markError) { jobLogger.error('Failed to mark import as failed', { error, markError }); } @@ -320,7 +270,7 @@ export async function importJob(job: Job) { function createProvider( record: Prisma.ImportGetPayload<{ include: { project: true } }>, - jobLogger: ILogger, + jobLogger: ILogger ) { const config = record.config; switch (config.provider) { diff --git a/packages/db/index.ts b/packages/db/index.ts index 06c41720..f0e461c3 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -1,6 +1,5 @@ export * from './src/prisma-client'; export * from './src/clickhouse/client'; -export * from './src/clickhouse/csv'; export * from './src/sql-builder'; export * from './src/services/chart.service'; export * from './src/engine'; diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts index bcba30be..c1d306a2 100644 --- a/packages/db/src/clickhouse/client.ts +++ b/packages/db/src/clickhouse/client.ts @@ -1,11 +1,9 @@ -import { Readable } from 'node:stream'; import type { ClickHouseSettings, ResponseJSON } from '@clickhouse/client'; import { ClickHouseLogLevel, createClient } from '@clickhouse/client'; -import sqlstring from 'sqlstring'; - import type { NodeClickHouseClientConfigOptions } from '@clickhouse/client/dist/config'; import { createLogger } from '@openpanel/logger'; import type { IInterval } from '@openpanel/validation'; +import sqlstring from 'sqlstring'; export { createClient }; @@ -68,8 +66,11 @@ export const TABLE_NAMES = { * Non-clustered mode = self-hosted environments */ export function isClickhouseClustered(): boolean { - if (process.env.CLICKHOUSE_CLUSTER === 'true' || process.env.CLICKHOUSE_CLUSTER === '1') { - return true + if ( + process.env.CLICKHOUSE_CLUSTER === 'true' || + process.env.CLICKHOUSE_CLUSTER === '1' + ) { + return true; } return !( @@ -97,21 +98,21 @@ function getClickhouseSettings(): ClickHouseSettings { return { distributed_product_mode: 'allow', date_time_input_format: 'best_effort', - ...(!process.env.CLICKHOUSE_SETTINGS_REMOVE_CONVERT_ANY_JOIN - ? { + ...(process.env.CLICKHOUSE_SETTINGS_REMOVE_CONVERT_ANY_JOIN + ? {} + : { query_plan_convert_any_join_to_semi_or_anti_join: 0, - } - : {}), + }), ...additionalSettings, }; } export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = { max_open_connections: 30, - request_timeout: 300000, + request_timeout: 300_000, keep_alive: { enabled: true, - idle_socket_ttl: 60000, + idle_socket_ttl: 60_000, }, compression: { request: true, @@ -138,7 +139,7 @@ const cleanQuery = (query?: string) => export async function withRetry( operation: () => Promise, maxRetries = 3, - baseDelay = 500, + baseDelay = 500 ): Promise { let lastError: Error | undefined; @@ -162,7 +163,7 @@ export async function withRetry( `Attempt ${attempt + 1}/${maxRetries} failed, retrying in ${delay}ms`, { error: error.message, - }, + } ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; @@ -213,7 +214,7 @@ export const ch = new Proxy(originalCh, { export async function chQueryWithMeta>( query: string, - clickhouseSettings?: ClickHouseSettings, + clickhouseSettings?: ClickHouseSettings ): Promise> { const start = Date.now(); const res = await ch.query({ @@ -249,44 +250,16 @@ export async function chQueryWithMeta>( return response; } -export async function chInsertCSV(tableName: string, rows: string[]) { - try { - const now = performance.now(); - // Create a readable stream in binary mode for CSV (similar to EventBuffer) - const csvStream = Readable.from(rows.join('\n'), { - objectMode: false, - }); - - await ch.insert({ - table: tableName, - values: csvStream, - format: 'CSV', - clickhouse_settings: { - format_csv_allow_double_quotes: 1, - format_csv_allow_single_quotes: 0, - }, - }); - - logger.info('CSV Insert successful', { - elapsed: performance.now() - now, - rows: rows.length, - }); - } catch (error) { - logger.error('CSV Insert failed:', error); - throw error; - } -} - export async function chQuery>( query: string, - clickhouseSettings?: ClickHouseSettings, + clickhouseSettings?: ClickHouseSettings ): Promise { return (await chQueryWithMeta(query, clickhouseSettings)).data; } export function formatClickhouseDate( date: Date | string, - skipTime = false, + skipTime = false ): string { if (skipTime) { return new Date(date).toISOString().split('T')[0]!; diff --git a/packages/db/src/clickhouse/csv.ts b/packages/db/src/clickhouse/csv.ts deleted file mode 100644 index 21802f04..00000000 --- a/packages/db/src/clickhouse/csv.ts +++ /dev/null @@ -1,53 +0,0 @@ -// ClickHouse Map(String, String) format in CSV uses single quotes, not JSON double quotes -// Format: '{'key1':'value1','key2':'value2'}' -// Single quotes inside values must be escaped with backslash: \' -// We also need to escape newlines and control characters to prevent CSV parsing issues -const escapeMapValue = (str: string) => { - return str - .replace(/\\/g, '\\\\') // Escape backslashes first - .replace(/'/g, "\\'") // Escape single quotes - .replace(/\n/g, '\\n') // Escape newlines - .replace(/\r/g, '\\r') // Escape carriage returns - .replace(/\t/g, '\\t') // Escape tabs - .replace(/\0/g, '\\0'); // Escape null bytes -}; - -export const csvEscapeJson = ( - value: Record | null | undefined, -): string => { - if (value == null) return ''; - - // Normalize to strings if your column is Map(String,String) - const normalized: Record = Object.fromEntries( - Object.entries(value).map(([k, v]) => [ - String(k), - v == null ? '' : String(v), - ]), - ); - - // Empty object should return empty Map (without quotes, csvEscapeField will handle if needed) - if (Object.keys(normalized).length === 0) return '{}'; - - const pairs = Object.entries(normalized) - .map(([k, v]) => `'${escapeMapValue(k)}':'${escapeMapValue(v)}'`) - .join(','); - - // Return Map format without outer quotes - csvEscapeField will handle CSV escaping - // This allows csvEscapeField to properly wrap/escape the entire field if it contains newlines/quotes - return csvEscapeField(`{${pairs}}`); -}; - -// Escape a CSV field - wrap in double quotes if it contains commas, quotes, or newlines -// Double quotes inside must be doubled (""), per CSV standard -export const csvEscapeField = (value: string | number): string => { - const str = String(value); - - // If field contains commas, quotes, or newlines, it must be quoted - if (/[,"\n\r]/.test(str)) { - // Escape double quotes by doubling them - const escaped = str.replace(/"/g, '""'); - return `"${escaped}"`; - } - - return str; -}; diff --git a/packages/db/src/services/import.service.ts b/packages/db/src/services/import.service.ts index 36d37169..19132ab6 100644 --- a/packages/db/src/services/import.service.ts +++ b/packages/db/src/services/import.service.ts @@ -1,15 +1,14 @@ +import { createHash } from 'node:crypto'; import type { ILogger } from '@openpanel/logger'; -import sqlstring from 'sqlstring'; import { - TABLE_NAMES, ch, - chInsertCSV, convertClickhouseDateToJs, formatClickhouseDate, getReplicatedTableName, + TABLE_NAMES, } from '../clickhouse/client'; -import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv'; -import { type Prisma, db } from '../prisma-client'; +import { db, type Prisma } from '../prisma-client'; +import type { IClickhouseProfile } from './profile.service'; import type { IClickhouseEvent } from './event.service'; export interface ImportStageResult { @@ -18,11 +17,89 @@ export interface ImportStageResult { insertedEvents: number; } -export interface ImportProgress { - importId: string; - totalEvents: number; - insertedEvents: number; - status: 'pending' | 'processing' | 'processed' | 'failed'; +const SESSION_GAP_MS = 30 * 60 * 1000; // 30 minutes + +/** + * Generate gap-based session IDs for events that have none. + * Streams events from staging (sorted by device_id, created_at), assigns a new + * session when gap > 30 min, re-inserts with session_id, then deletes old rows. + */ +export async function generateGapBasedSessionIds( + importId: string +): Promise { + let currentDeviceId = ''; + let currentSessionId = ''; + let currentLastTime = 0; + let currentCounter = -1; + const BATCH_SIZE = 5000; + const batch: IClickhouseEvent[] = []; + + const result = await ch.query({ + query: ` + SELECT id, name, sdk_name, sdk_version, device_id, profile_id, project_id, + session_id, path, origin, referrer, referrer_name, referrer_type, + duration, properties, created_at, country, city, region, + longitude, latitude, os, os_version, browser, browser_version, + device, brand, model, imported_at + FROM ${TABLE_NAMES.events_imports} + WHERE import_id = {importId:String} + AND session_id = '' + AND device != 'server' + ORDER BY device_id, created_at + `, + query_params: { importId }, + format: 'JSONEachRow', + }); + + const stream = result.stream(); + for await (const rows of stream) { + for (const row of rows) { + const event = row.json() as IClickhouseEvent; + const time = new Date(event.created_at).getTime(); + + if (event.device_id !== currentDeviceId) { + currentDeviceId = event.device_id; + currentSessionId = ''; + currentLastTime = 0; + currentCounter = -1; + } + + if (!currentSessionId || time - currentLastTime > SESSION_GAP_MS) { + currentCounter++; + currentSessionId = createHash('md5') + .update(`${event.device_id}-${currentCounter}`) + .digest('hex') + .toLowerCase(); + } + currentLastTime = time; + event.session_id = currentSessionId; + + batch.push(event); + if (batch.length >= BATCH_SIZE) { + await insertImportBatch(batch, importId); + batch.length = 0; + } + } + } + + if (batch.length > 0) { + await insertImportBatch(batch, importId); + } + + const mutationTable = getReplicatedTableName(TABLE_NAMES.events_imports); + await ch.command({ + query: `ALTER TABLE ${mutationTable} DELETE + WHERE import_id = {importId:String} + AND session_id = '' + AND device != 'server'`, + query_params: { importId }, + clickhouse_settings: { + wait_end_of_query: 1, + mutations_sync: '2', + send_progress_in_http_headers: 1, + http_headers_progress_interval_ms: '50000', + }, + }); } /** @@ -30,55 +107,26 @@ export interface ImportProgress { */ export async function insertImportBatch( events: IClickhouseEvent[], - importId: string, + importId: string ): Promise { if (events.length === 0) { return { importId, totalEvents: 0, insertedEvents: 0 }; } - // Important to have same order as events_imports table - // CSV format: properly quotes fields that need it - const csvRows = events.map((event) => { - // Properties need to be converted to JSON for Map(String, String) - // All fields must be CSV-escaped when joining with commas - const fields = [ - csvEscapeField(event.id || ''), - csvEscapeField(event.name), - csvEscapeField(event.sdk_name || ''), - csvEscapeField(event.sdk_version || ''), - csvEscapeField(event.device_id || ''), - csvEscapeField(event.profile_id || ''), - csvEscapeField(event.project_id || ''), - csvEscapeField(event.session_id || ''), - csvEscapeField(event.path), - csvEscapeField(event.origin || ''), - csvEscapeField(event.referrer || ''), - csvEscapeField(event.referrer_name || ''), - csvEscapeField(event.referrer_type || ''), - csvEscapeField(event.duration ?? 0), - csvEscapeJson(event.properties), - csvEscapeField(event.created_at), - csvEscapeField(event.country || ''), - csvEscapeField(event.city || ''), - csvEscapeField(event.region || ''), - csvEscapeField(event.longitude != null ? event.longitude : '\\N'), - csvEscapeField(event.latitude != null ? event.latitude : '\\N'), - csvEscapeField(event.os || ''), - csvEscapeField(event.os_version || ''), - csvEscapeField(event.browser || ''), - csvEscapeField(event.browser_version || ''), - csvEscapeField(event.device || ''), - csvEscapeField(event.brand || ''), - csvEscapeField(event.model || ''), - csvEscapeField('\\N'), // imported_at (Nullable) - csvEscapeField(importId), - csvEscapeField('pending'), // import_status - csvEscapeField(formatClickhouseDate(new Date())), // imported_at_meta (DateTime, not DateTime64, so no milliseconds) - ]; - return fields.join(','); - }); + const now = formatClickhouseDate(new Date()); + const rows = events.map((event) => ({ + ...event, + import_id: importId, + import_status: 'pending', + imported_at: event.imported_at || now, + imported_at_meta: now, + })); - await chInsertCSV(TABLE_NAMES.events_imports, csvRows); + await ch.insert({ + table: TABLE_NAMES.events_imports, + values: rows, + format: 'JSONEachRow', + }); return { importId, @@ -88,44 +136,86 @@ export async function insertImportBatch( } /** - * Generate deterministic session IDs for events that don't have them - * Uses 30-minute time windows to create consistent session IDs across imports - * Only processes events where device != 'server' and session_id = '' + * Insert a batch of profiles into the production profiles table. + * Used by Mixpanel (and other providers) to import user profiles during an import job. */ -export async function generateSessionIds( - importId: string, - from: string, -): Promise { - const rangeWhere = [ - 'import_id = {importId:String}', - "import_status = 'pending'", - "device != 'server'", - "session_id = ''", - from ? 'toDate(created_at) = {from:String}' : '', - ] - .filter(Boolean) - .join(' AND '); +export async function insertProfilesBatch( + profiles: IClickhouseProfile[], + projectId: string +): Promise<{ inserted: number }> { + if (profiles.length === 0) { + return { inserted: 0 }; + } - // Use SQL to generate deterministic session IDs based on device_id + 30-min time windows - // This ensures same events always get same session IDs regardless of import order - // In clustered mode, we must use the replicated table for mutations + const normalized = profiles.map((p) => ({ + id: p.id, + project_id: projectId, + first_name: p.first_name ?? '', + last_name: p.last_name ?? '', + email: p.email ?? '', + avatar: p.avatar ?? '', + is_external: p.is_external ?? true, + properties: Object.fromEntries( + Object.entries(p.properties || {}).filter( + (kv): kv is [string, string] => kv[1] != null && kv[1] !== '' + ) + ) as Record, + created_at: p.created_at, + })); + + await ch.insert({ + table: TABLE_NAMES.profiles, + values: normalized, + format: 'JSONEachRow', + }); + + return { inserted: normalized.length }; +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/** + * Delete all staging data for an import. Used to get a clean slate on retry + * when the failure happened before moving data to production. + */ +export async function cleanupStagingData(importId: string): Promise { const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports); - const updateQuery = ` - ALTER TABLE ${mutationTableName} - UPDATE session_id = lower(hex(MD5(concat( - device_id, - '-', - toString(toInt64(toUnixTimestamp(created_at) / 1800)) - )))) - WHERE ${rangeWhere} - `; - await ch.command({ - query: updateQuery, - query_params: { importId, from }, + query: `ALTER TABLE ${mutationTableName} DELETE WHERE import_id = {importId:String}`, + query_params: { importId }, clickhouse_settings: { wait_end_of_query: 1, - mutations_sync: '2', // Wait for mutation to complete on all replicas (critical!) + mutations_sync: '2', send_progress_in_http_headers: 1, http_headers_progress_interval_ms: '50000', }, @@ -133,315 +223,256 @@ export async function generateSessionIds( } /** - * Reconstruct sessions using SQL-based logic - * This identifies session boundaries and creates session_start/session_end events - * session_start inherits all properties from the first event in the session - * session_end inherits all properties from the last event in the session and calculates duration + * Reconstruct sessions across ALL dates for the import. + * Each session_id gets exactly one session_start and one session_end, + * even if the session spans midnight. + * + * Batches by fetching distinct session_ids first, then running the + * heavy aggregation only for that batch of IDs. */ export async function createSessionsStartEndEvents( - importId: string, - from: string, + importId: string ): Promise { - // First, let's identify session boundaries and get first/last events for each session - const rangeWhere = [ + const SESSION_BATCH_SIZE = 5000; + let lastSessionId = ''; + + const baseWhere = [ 'import_id = {importId:String}', - "import_status = 'pending'", - "session_id != ''", // Only process events that have session IDs - 'toDate(created_at) = {from:String}', - ] - .filter(Boolean) - .join(' AND '); + "session_id != ''", + "name NOT IN ('session_start', 'session_end')", + ].join(' AND '); - // Use window functions to efficiently get first event (all fields) and last event (only changing fields) - // session_end only needs: properties, path, origin, created_at - the rest can be inherited from session_start - const sessionEventsQuery = ` - SELECT - device_id, - session_id, - project_id, - profile_id, - argMin((path, origin, referrer, referrer_name, referrer_type, properties, created_at, country, city, region, longitude, latitude, os, os_version, browser, browser_version, device, brand, model), created_at) AS first_event, - argMax((path, origin, properties, created_at), created_at) AS last_event_fields, - min(created_at) AS first_timestamp, - max(created_at) AS last_timestamp - FROM ${TABLE_NAMES.events_imports} - WHERE ${rangeWhere} - AND name NOT IN ('session_start', 'session_end') - GROUP BY session_id, device_id, project_id, profile_id - `; + while (true) { + const idsResult = await ch.query({ + query: ` + SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events_imports} + WHERE ${baseWhere} + AND session_id > {lastSessionId:String} + ORDER BY session_id + LIMIT {limit:UInt32} + `, + query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, + format: 'JSONEachRow', + }); - const sessionEventsResult = await ch.query({ - query: sessionEventsQuery, - query_params: { importId, from }, - format: 'JSONEachRow', - }); + const idRows = (await idsResult.json()) as Array<{ session_id: string }>; + if (idRows.length === 0) { + break; + } - const sessionData = (await sessionEventsResult.json()) as Array<{ - device_id: string; - session_id: string; - project_id: string; - profile_id: string; - first_event: [ - // string, // id - // string, // name - string, // path - string, // origin - string, // referrer - string, // referrer_name - string, // referrer_type - // number, // duration - Record, // properties - string, // created_at - string, // country - string, // city - string, // region - number | null, // longitude - number | null, // latitude - string, // os - string, // os_version - string, // browser - string, // browser_version - string, // device - string, // brand - string, // model - // string, // sdk_name - // string, // sdk_version - // string, // imported_at - ]; - last_event_fields: [ - string, // path - string, // origin - Record, // properties - string, // created_at - ]; - first_timestamp: string; - last_timestamp: string; - }>; + const sessionIds = idRows.map((r) => r.session_id); - // Create session_start and session_end events - const sessionEvents: IClickhouseEvent[] = []; + const sessionEventsQuery = ` + SELECT + device_id, + session_id, + project_id, + if( + any(nullIf(profile_id, device_id)) IS NULL, + any(profile_id), + any(nullIf(profile_id, device_id)) + ) AS profile_id, + argMin((path, origin, referrer, referrer_name, referrer_type, properties, created_at, country, city, region, longitude, latitude, os, os_version, browser, browser_version, device, brand, model), created_at) AS first_event, + argMax((path, origin, properties, created_at), created_at) AS last_event_fields, + min(created_at) AS first_timestamp, + max(created_at) AS last_timestamp + FROM ${TABLE_NAMES.events_imports} + WHERE ${baseWhere} + AND session_id IN ({sessionIds:Array(String)}) + GROUP BY session_id, device_id, project_id + `; - for (const session of sessionData) { - // Destructure first event tuple (all fields) - const [ - // firstId, - // firstName, - firstPath, - firstOrigin, - firstReferrer, - firstReferrerName, - firstReferrerType, - // firstDuration, - firstProperties, - firstCreatedAt, - firstCountry, - firstCity, - firstRegion, - firstLongitude, - firstLatitude, - firstOs, - firstOsVersion, - firstBrowser, - firstBrowserVersion, - firstDevice, - firstBrand, - firstModel, - // firstSdkName, - // firstSdkVersion, - // firstImportedAt, - ] = session.first_event; + const sessionEventsResult = await ch.query({ + query: sessionEventsQuery, + query_params: { importId, sessionIds }, + format: 'JSONEachRow', + }); - // Destructure last event fields (only the changing ones) - const [lastPath, lastOrigin, lastProperties, lastCreatedAt] = - session.last_event_fields; + const sessionData = (await sessionEventsResult.json()) as Array<{ + device_id: string; + session_id: string; + project_id: string; + profile_id: string; + first_event: [ + string, // path + string, // origin + string, // referrer + string, // referrer_name + string, // referrer_type + Record, // properties + string, // created_at + string, // country + string, // city + string, // region + number | null, // longitude + number | null, // latitude + string, // os + string, // os_version + string, // browser + string, // browser_version + string, // device + string, // brand + string, // model + ]; + last_event_fields: [ + string, // path + string, // origin + Record, // properties + string, // created_at + ]; + first_timestamp: string; + last_timestamp: string; + }>; - // Calculate duration in milliseconds - // Parse timestamps as Date objects to calculate duration - const firstTime = new Date(session.first_timestamp).getTime(); - const lastTime = new Date(session.last_timestamp).getTime(); - const durationMs = Math.max(0, lastTime - firstTime); // Ensure non-negative duration + const sessionEvents: IClickhouseEvent[] = []; - // Helper function to adjust timestamp by milliseconds without timezone conversion const adjustTimestamp = (timestamp: string, offsetMs: number): string => { - // Parse the timestamp, adjust it, and format back to ClickHouse format const date = convertClickhouseDateToJs(timestamp); date.setTime(date.getTime() + offsetMs); return formatClickhouseDate(date); }; - // Create session_start event - inherit everything from first event but change name - // Set created_at to 1 second before the first event - sessionEvents.push({ - id: crypto.randomUUID(), - name: 'session_start', - device_id: session.device_id, - profile_id: session.profile_id, - project_id: session.project_id, - session_id: session.session_id, - path: firstPath, - origin: firstOrigin, - referrer: firstReferrer, - referrer_name: firstReferrerName, - referrer_type: firstReferrerType, - duration: 0, // session_start always has 0 duration - properties: firstProperties as Record< - string, - string | number | boolean | null | undefined - >, - created_at: adjustTimestamp(session.first_timestamp, -1000), // 1 second before first event - country: firstCountry, - city: firstCity, - region: firstRegion, - longitude: firstLongitude, - latitude: firstLatitude, - os: firstOs, - os_version: firstOsVersion, - browser: firstBrowser, - browser_version: firstBrowserVersion, - device: firstDevice, - brand: firstBrand, - model: firstModel, - imported_at: new Date().toISOString(), - sdk_name: 'import-session-reconstruction', - sdk_version: '1.0.0', - }); + for (const session of sessionData) { + const [ + firstPath, + firstOrigin, + firstReferrer, + firstReferrerName, + firstReferrerType, + firstProperties, + _firstCreatedAt, + firstCountry, + firstCity, + firstRegion, + firstLongitude, + firstLatitude, + firstOs, + firstOsVersion, + firstBrowser, + firstBrowserVersion, + firstDevice, + firstBrand, + firstModel, + ] = session.first_event; - // Create session_end event - inherit most from session_start, but use last event's path, origin, properties - // Set created_at to 1 second after the last event - sessionEvents.push({ - id: crypto.randomUUID(), - name: 'session_end', - device_id: session.device_id, - profile_id: session.profile_id, - project_id: session.project_id, - session_id: session.session_id, - path: lastPath, // From last event - origin: lastOrigin, // From last event - referrer: firstReferrer, // Same as session_start - referrer_name: firstReferrerName, // Same as session_start - referrer_type: firstReferrerType, // Same as session_start - duration: durationMs, - properties: lastProperties as Record< - string, - string | number | boolean | null | undefined - >, // From last event - created_at: adjustTimestamp(session.last_timestamp, 500), // 1 second after last event - country: firstCountry, // Same as session_start - city: firstCity, // Same as session_start - region: firstRegion, // Same as session_start - longitude: firstLongitude, // Same as session_start - latitude: firstLatitude, // Same as session_start - os: firstOs, // Same as session_start - os_version: firstOsVersion, // Same as session_start - browser: firstBrowser, // Same as session_start - browser_version: firstBrowserVersion, // Same as session_start - device: firstDevice, // Same as session_start - brand: firstBrand, // Same as session_start - model: firstModel, // Same as session_start - imported_at: new Date().toISOString(), - sdk_name: 'import-session-reconstruction', - sdk_version: '1.0.0', - }); - } + const [lastPath, lastOrigin, lastProperties, _lastCreatedAt] = + session.last_event_fields; - // Insert session events into imports table - if (sessionEvents.length > 0) { - await insertImportBatch(sessionEvents, importId); + const firstTime = new Date(session.first_timestamp).getTime(); + const lastTime = new Date(session.last_timestamp).getTime(); + const durationMs = Math.max(0, lastTime - firstTime); + + sessionEvents.push({ + id: crypto.randomUUID(), + name: 'session_start', + device_id: session.device_id, + profile_id: session.profile_id, + project_id: session.project_id, + session_id: session.session_id, + path: firstPath, + origin: firstOrigin, + referrer: firstReferrer, + referrer_name: firstReferrerName, + referrer_type: firstReferrerType, + duration: 0, + properties: firstProperties as Record< + string, + string | number | boolean | null | undefined + >, + created_at: adjustTimestamp(session.first_timestamp, -1000), + country: firstCountry, + city: firstCity, + region: firstRegion, + longitude: firstLongitude, + latitude: firstLatitude, + os: firstOs, + os_version: firstOsVersion, + browser: firstBrowser, + browser_version: firstBrowserVersion, + device: firstDevice, + brand: firstBrand, + model: firstModel, + imported_at: new Date().toISOString(), + sdk_name: 'import-session-reconstruction', + sdk_version: '1.0.0', + }); + + sessionEvents.push({ + id: crypto.randomUUID(), + name: 'session_end', + device_id: session.device_id, + profile_id: session.profile_id, + project_id: session.project_id, + session_id: session.session_id, + path: lastPath, + origin: lastOrigin, + referrer: firstReferrer, + referrer_name: firstReferrerName, + referrer_type: firstReferrerType, + duration: durationMs, + properties: lastProperties as Record< + string, + string | number | boolean | null | undefined + >, + created_at: adjustTimestamp(session.last_timestamp, 1000), + country: firstCountry, + city: firstCity, + region: firstRegion, + longitude: firstLongitude, + latitude: firstLatitude, + os: firstOs, + os_version: firstOsVersion, + browser: firstBrowser, + browser_version: firstBrowserVersion, + device: firstDevice, + brand: firstBrand, + model: firstModel, + imported_at: new Date().toISOString(), + sdk_name: 'import-session-reconstruction', + sdk_version: '1.0.0', + }); + } + + if (sessionEvents.length > 0) { + await insertImportBatch(sessionEvents, importId); + } + + lastSessionId = idRows[idRows.length - 1]!.session_id; + if (idRows.length < SESSION_BATCH_SIZE) { + break; + } } } /** - * Migrate all events from imports table to production events table - * This includes both original events and generated session events + * Move events from staging to production events table. + * Batched per-day using a simple date filter. */ export async function moveImportsToProduction( importId: string, - from: string, + from: string ): Promise { - // Build the WHERE clause for migration - // For session events (session_start/session_end), we don't filter by their created_at - // because they're created with adjusted timestamps (±1 second) that might fall outside - // the date range. Instead, we include them if their session_id has events in this range. let whereClause = 'import_id = {importId:String}'; if (from) { - whereClause += ` AND ( - (toDate(created_at) = {from:String}) OR - ( - name IN ('session_start', 'session_end') AND - session_id IN ( - SELECT DISTINCT session_id - FROM ${TABLE_NAMES.events_imports} - WHERE import_id = {importId:String} - AND toDate(created_at) = {from:String} - AND name NOT IN ('session_start', 'session_end') - ) - ) - )`; + whereClause += ' AND toDate(created_at) = {from:String}'; } const migrationQuery = ` INSERT INTO ${TABLE_NAMES.events} ( - id, - name, - sdk_name, - sdk_version, - device_id, - profile_id, - project_id, - session_id, - path, - origin, - referrer, - referrer_name, - referrer_type, - duration, - properties, - created_at, - country, - city, - region, - longitude, - latitude, - os, - os_version, - browser, - browser_version, - device, - brand, - model, - imported_at + id, name, sdk_name, sdk_version, device_id, profile_id, project_id, + session_id, path, origin, referrer, referrer_name, referrer_type, + duration, properties, created_at, country, city, region, + longitude, latitude, os, os_version, browser, browser_version, + device, brand, model, imported_at ) SELECT - id, - name, - sdk_name, - sdk_version, - device_id, - profile_id, - project_id, - session_id, - path, - origin, - referrer, - referrer_name, - referrer_type, - duration, - properties, - created_at, - country, - city, - region, - longitude, - latitude, - os, - os_version, - browser, - browser_version, - device, - brand, - model, - imported_at + id, name, sdk_name, sdk_version, device_id, profile_id, project_id, + session_id, path, origin, referrer, referrer_name, referrer_type, + duration, properties, created_at, country, city, region, + longitude, latitude, os, os_version, browser, browser_version, + device, brand, model, imported_at FROM ${TABLE_NAMES.events_imports} WHERE ${whereClause} ORDER BY created_at ASC @@ -452,227 +483,127 @@ export async function moveImportsToProduction( query_params: { importId, from }, clickhouse_settings: { wait_end_of_query: 1, - // Ask ClickHouse to periodically send query execution progress in HTTP headers, creating some activity in the connection. send_progress_in_http_headers: 1, - // The interval of sending these progress headers. Here it is less than 60s, http_headers_progress_interval_ms: '50000', }, }); } +/** + * Aggregate sessions from staging into the sessions table. + * Runs across all dates so cross-midnight sessions become one row. + * Batches by session_ids to bound ClickHouse memory. + */ export async function backfillSessionsToProduction( - importId: string, - from: string, + importId: string ): Promise { - // After migrating events, populate the sessions table based on the migrated sessions - // We detect all session_ids involved in this import from the imports table, - // then aggregate over the production events to construct session rows. - const sessionsInsertQuery = ` - INSERT INTO ${TABLE_NAMES.sessions} ( - id, - project_id, - profile_id, - device_id, - created_at, - ended_at, - is_bounce, - entry_origin, - entry_path, - exit_origin, - exit_path, - screen_view_count, - revenue, - event_count, - duration, - country, - region, - city, - longitude, - latitude, - device, - brand, - model, - browser, - browser_version, - os, - os_version, - sign, - version, - utm_medium, - utm_source, - utm_campaign, - utm_content, - utm_term, - referrer, - referrer_name, - referrer_type - ) - SELECT - any(e.session_id) as id, - any(e.project_id) as project_id, - if(any(nullIf(e.profile_id, e.device_id)) IS NULL, any(e.profile_id), any(nullIf(e.profile_id, e.device_id))) as profile_id, - any(e.device_id) as device_id, - argMin(e.created_at, e.created_at) as created_at, - argMax(e.created_at, e.created_at) as ended_at, - if( - argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = '', - if(countIf(e.name = 'screen_view') > 1, false, true), - argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = 'true' - ) as is_bounce, - argMinIf(e.origin, e.created_at, e.name = 'session_start') as entry_origin, - argMinIf(e.path, e.created_at, e.name = 'session_start') as entry_path, - argMaxIf(e.origin, e.created_at, e.name = 'session_end' OR e.name = 'screen_view') as exit_origin, - argMaxIf(e.path, e.created_at, e.name = 'session_end' OR e.name = 'screen_view') as exit_path, - countIf(e.name = 'screen_view') as screen_view_count, - 0 as revenue, - countIf(e.name != 'screen_view' AND e.name != 'session_start' AND e.name != 'session_end') as event_count, - sumIf(e.duration, name = 'session_end') AS duration, - argMinIf(e.country, e.created_at, e.name = 'session_start') as country, - argMinIf(e.region, e.created_at, e.name = 'session_start') as region, - argMinIf(e.city, e.created_at, e.name = 'session_start') as city, - argMinIf(e.longitude, e.created_at, e.name = 'session_start') as longitude, - argMinIf(e.latitude, e.created_at, e.name = 'session_start') as latitude, - argMinIf(e.device, e.created_at, e.name = 'session_start') as device, - argMinIf(e.brand, e.created_at, e.name = 'session_start') as brand, - argMinIf(e.model, e.created_at, e.name = 'session_start') as model, - argMinIf(e.browser, e.created_at, e.name = 'session_start') as browser, - argMinIf(e.browser_version, e.created_at, e.name = 'session_start') as browser_version, - argMinIf(e.os, e.created_at, e.name = 'session_start') as os, - argMinIf(e.os_version, e.created_at, e.name = 'session_start') as os_version, - 1 as sign, - 1 as version, - argMinIf(e.properties['__query.utm_medium'], e.created_at, e.name = 'session_start') as utm_medium, - argMinIf(e.properties['__query.utm_source'], e.created_at, e.name = 'session_start') as utm_source, - argMinIf(e.properties['__query.utm_campaign'], e.created_at, e.name = 'session_start') as utm_campaign, - argMinIf(e.properties['__query.utm_content'], e.created_at, e.name = 'session_start') as utm_content, - argMinIf(e.properties['__query.utm_term'], e.created_at, e.name = 'session_start') as utm_term, - argMinIf(e.referrer, e.created_at, e.name = 'session_start') as referrer, - argMinIf(e.referrer_name, e.created_at, e.name = 'session_start') as referrer_name, - argMinIf(e.referrer_type, e.created_at, e.name = 'session_start') as referrer_type - FROM ${TABLE_NAMES.events_imports} e - WHERE - e.import_id = ${sqlstring.escape(importId)} - AND e.session_id != '' - AND ( - (toDate(e.created_at) = ${sqlstring.escape(from)}) OR - ( - e.name IN ('session_start', 'session_end') AND - e.session_id IN ( - SELECT DISTINCT session_id - FROM ${TABLE_NAMES.events_imports} - WHERE import_id = ${sqlstring.escape(importId)} - AND toDate(created_at) = ${sqlstring.escape(from)} - AND name NOT IN ('session_start', 'session_end') - ) - ) + const SESSION_BATCH_SIZE = 5000; + let lastSessionId = ''; + + while (true) { + const idsResult = await ch.query({ + query: ` + SELECT DISTINCT session_id + FROM ${TABLE_NAMES.events_imports} + WHERE import_id = {importId:String} + AND session_id > {lastSessionId:String} + ORDER BY session_id + LIMIT {limit:UInt32} + `, + query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE }, + format: 'JSONEachRow', + }); + + const idRows = (await idsResult.json()) as Array<{ session_id: string }>; + if (idRows.length === 0) { + break; + } + + const sessionIds = idRows.map((r) => r.session_id); + + const sessionsInsertQuery = ` + INSERT INTO ${TABLE_NAMES.sessions} ( + id, project_id, profile_id, device_id, created_at, ended_at, + is_bounce, entry_origin, entry_path, exit_origin, exit_path, + screen_view_count, revenue, event_count, duration, + country, region, city, longitude, latitude, + device, brand, model, browser, browser_version, os, os_version, + sign, version, + utm_medium, utm_source, utm_campaign, utm_content, utm_term, + referrer, referrer_name, referrer_type ) - GROUP BY e.session_id - `; + SELECT + any(e.session_id) as id, + any(e.project_id) as project_id, + if(any(nullIf(e.profile_id, e.device_id)) IS NULL, any(e.profile_id), any(nullIf(e.profile_id, e.device_id))) as profile_id, + any(e.device_id) as device_id, + argMin(e.created_at, e.created_at) as created_at, + argMax(e.created_at, e.created_at) as ended_at, + if( + argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = '', + if(countIf(e.name = 'screen_view') > 1, false, true), + argMaxIf(e.properties['__bounce'], e.created_at, e.name = 'session_end') = 'true' + ) as is_bounce, + argMinIf(e.origin, e.created_at, e.name = 'session_start') as entry_origin, + argMinIf(e.path, e.created_at, e.name = 'session_start') as entry_path, + argMaxIf(e.origin, e.created_at, e.name = 'session_end' OR e.name = 'screen_view') as exit_origin, + argMaxIf(e.path, e.created_at, e.name = 'session_end' OR e.name = 'screen_view') as exit_path, + countIf(e.name = 'screen_view') as screen_view_count, + 0 as revenue, + countIf(e.name != 'screen_view' AND e.name != 'session_start' AND e.name != 'session_end') as event_count, + sumIf(e.duration, name = 'session_end') AS duration, + argMinIf(e.country, e.created_at, e.name = 'session_start') as country, + argMinIf(e.region, e.created_at, e.name = 'session_start') as region, + argMinIf(e.city, e.created_at, e.name = 'session_start') as city, + argMinIf(e.longitude, e.created_at, e.name = 'session_start') as longitude, + argMinIf(e.latitude, e.created_at, e.name = 'session_start') as latitude, + argMinIf(e.device, e.created_at, e.name = 'session_start') as device, + argMinIf(e.brand, e.created_at, e.name = 'session_start') as brand, + argMinIf(e.model, e.created_at, e.name = 'session_start') as model, + argMinIf(e.browser, e.created_at, e.name = 'session_start') as browser, + argMinIf(e.browser_version, e.created_at, e.name = 'session_start') as browser_version, + argMinIf(e.os, e.created_at, e.name = 'session_start') as os, + argMinIf(e.os_version, e.created_at, e.name = 'session_start') as os_version, + 1 as sign, + 1 as version, + argMinIf(e.properties['__query.utm_medium'], e.created_at, e.name = 'session_start') as utm_medium, + argMinIf(e.properties['__query.utm_source'], e.created_at, e.name = 'session_start') as utm_source, + argMinIf(e.properties['__query.utm_campaign'], e.created_at, e.name = 'session_start') as utm_campaign, + argMinIf(e.properties['__query.utm_content'], e.created_at, e.name = 'session_start') as utm_content, + argMinIf(e.properties['__query.utm_term'], e.created_at, e.name = 'session_start') as utm_term, + argMinIf(e.referrer, e.created_at, e.name = 'session_start') as referrer, + argMinIf(e.referrer_name, e.created_at, e.name = 'session_start') as referrer_name, + argMinIf(e.referrer_type, e.created_at, e.name = 'session_start') as referrer_type + FROM ${TABLE_NAMES.events_imports} e + WHERE + e.import_id = {importId:String} + AND e.session_id IN ({sessionIds:Array(String)}) + GROUP BY e.session_id + `; - await ch.command({ - query: sessionsInsertQuery, - clickhouse_settings: { - wait_end_of_query: 1, - // Ask ClickHouse to periodically send query execution progress in HTTP headers, creating some activity in the connection. - send_progress_in_http_headers: 1, - // The interval of sending these progress headers. Here it is less than 60s, - http_headers_progress_interval_ms: '50000', - }, - }); -} + await ch.command({ + query: sessionsInsertQuery, + query_params: { importId, sessionIds }, + clickhouse_settings: { + wait_end_of_query: 1, + send_progress_in_http_headers: 1, + http_headers_progress_interval_ms: '50000', + }, + }); -/** - * Mark import as complete by updating status - */ -export async function markImportComplete(importId: string): Promise { - // In clustered mode, we must use the replicated table for mutations - const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports); - const updateQuery = ` - ALTER TABLE ${mutationTableName} - UPDATE import_status = 'processed' - WHERE import_id = {importId:String} - `; - - await ch.command({ - query: updateQuery, - query_params: { importId }, - clickhouse_settings: { - wait_end_of_query: 1, - mutations_sync: '2', // Wait for mutation to complete - // Ask ClickHouse to periodically send query execution progress in HTTP headers, creating some activity in the connection. - send_progress_in_http_headers: 1, - // The interval of sending these progress headers. Here it is less than 60s, - http_headers_progress_interval_ms: '50000', - }, - }); -} - -/** - * Get import progress and status - */ -export async function getImportProgress( - importId: string, -): Promise { - const progressQuery = ` - SELECT - import_id, - COUNT(*) as total_events, - COUNTIf(import_status = 'pending') as pending_events, - COUNTIf(import_status = 'processed') as processed_events, - any(import_status) as status - FROM ${TABLE_NAMES.events_imports} - WHERE import_id = {importId:String} - AND name NOT IN ('session_start', 'session_end') - GROUP BY import_id - `; - - const result = await ch.query({ - query: progressQuery, - query_params: { importId }, - format: 'JSONEachRow', - }); - - const data = (await result.json()) as Array<{ - import_id: string; - total_events: number; - pending_events: number; - processed_events: number; - status: string; - }>; - - if (data.length === 0) { - return { - importId, - totalEvents: 0, - insertedEvents: 0, - status: 'pending', - }; + lastSessionId = idRows[idRows.length - 1]!.session_id; + if (idRows.length < SESSION_BATCH_SIZE) { + break; + } } - - const row = data[0]; - if (!row) { - return { - importId, - totalEvents: 0, - insertedEvents: 0, - status: 'pending', - }; - } - - return { - importId, - totalEvents: row.total_events, - insertedEvents: row.processed_events, - status: row.status as 'pending' | 'processing' | 'processed' | 'failed', - }; } /** - * Utility: get min/max created_at for an import + * Get min/max created_at for an import's staging data. */ export async function getImportDateBounds( importId: string, - fromCreatedAt?: string, + fromCreatedAt?: string ): Promise<{ min: string | null; max: string | null }> { const res = await ch.query({ query: ` @@ -697,10 +628,6 @@ export async function getImportDateBounds( : { min: null, max: null }; } -/** - * Unified method to update all import status information - * Combines step, batch, progress, and status message updates - */ export type UpdateImportStatusOptions = | { step: 'loading'; @@ -709,13 +636,17 @@ export type UpdateImportStatusOptions = processedEvents?: number; } | { - step: 'generating_session_ids'; - batch?: string; + step: 'loading_profiles'; + processedProfiles?: number; + totalProfiles?: number; } | { step: 'creating_sessions'; batch?: string; } + | { + step: 'generating_sessions'; + } | { step: 'moving'; batch?: string; @@ -740,7 +671,7 @@ export async function updateImportStatus( updateProgress: (progress: Record) => void; }, importId: string, - options: UpdateImportStatusOptions, + options: UpdateImportStatusOptions ): Promise { const data: Prisma.ImportUpdateInput = {}; switch (options.step) { @@ -754,27 +685,35 @@ export async function updateImportStatus( data.totalEvents = options.totalEvents; data.processedEvents = options.processedEvents; break; - case 'generating_session_ids': - data.currentStep = 'generating_session_ids'; - data.currentBatch = options.batch; - data.statusMessage = options.batch - ? `Generating session IDs for ${options.batch}` - : 'Generating session IDs...'; + case 'loading_profiles': + data.currentStep = 'loading_profiles'; + data.statusMessage = + options.processedProfiles != null && options.totalProfiles != null + ? `Importing user profiles (${options.processedProfiles} / ${options.totalProfiles})` + : 'Importing user profiles...'; break; case 'creating_sessions': data.currentStep = 'creating_sessions'; data.currentBatch = options.batch; - data.statusMessage = `Creating sessions for ${options.batch}`; + data.statusMessage = options.batch + ? `Creating sessions (${options.batch})` + : 'Creating sessions...'; + break; + case 'generating_sessions': + data.currentStep = 'generating_sessions'; + data.statusMessage = 'Generating session IDs...'; break; case 'moving': data.currentStep = 'moving'; data.currentBatch = options.batch; - data.statusMessage = `Moving imports to production for ${options.batch}`; + data.statusMessage = `Moving events to production (${options.batch})`; break; case 'backfilling_sessions': data.currentStep = 'backfilling_sessions'; data.currentBatch = options.batch; - data.statusMessage = `Aggregating sessions for ${options.batch}`; + data.statusMessage = options.batch + ? `Aggregating sessions (${options.batch})` + : 'Aggregating sessions...'; break; case 'completed': data.status = 'completed'; @@ -787,6 +726,8 @@ export async function updateImportStatus( data.statusMessage = 'Import failed'; data.errorMessage = options.errorMessage; break; + default: + break; } jobLogger.info('Import status update', data); diff --git a/packages/importer/src/providers/mixpanel.test.ts b/packages/importer/src/providers/mixpanel.test.ts index c3f1d052..da63b9d3 100644 --- a/packages/importer/src/providers/mixpanel.test.ts +++ b/packages/importer/src/providers/mixpanel.test.ts @@ -39,7 +39,7 @@ describe('mixpanel', () => { const rawEvent = { event: '$mp_web_page_view', properties: { - time: 1746097970, + time: 1_746_097_970, distinct_id: '$device:123', $browser: 'Chrome', $browser_version: 135, @@ -53,7 +53,7 @@ describe('mixpanel', () => { $insert_id: 'source_id', $lib_version: '2.60.0', $mp_api_endpoint: 'api-js.mixpanel.com', - $mp_api_timestamp_ms: 1746078175363, + $mp_api_timestamp_ms: 1_746_078_175_363, $mp_autocapture: true, $os: 'Android', $referrer: 'https://google.com/', @@ -71,7 +71,7 @@ describe('mixpanel', () => { gclid: 'oqneoqow', mp_country_code: 'IN', mp_lib: 'web', - mp_processing_time_ms: 1746078175546, + mp_processing_time_ms: 1_746_078_175_546, mp_sent_by_lib_version: '2.60.0', utm_medium: 'cpc', utm_source: 'google', @@ -101,7 +101,7 @@ describe('mixpanel', () => { __title: 'Landeed: Satbara Utara, 7/12 Extract, Property Card & Index 2', }, - created_at: '2025-05-01T11:12:50.000Z', + created_at: '2025-05-01 11:12:50', country: 'IN', city: 'Mumbai', region: 'Maharashtra', @@ -110,7 +110,7 @@ describe('mixpanel', () => { os: 'Android', os_version: undefined, browser: 'Chrome', - browser_version: '', + browser_version: '135', device: 'mobile', brand: '', model: '', @@ -141,7 +141,7 @@ describe('mixpanel', () => { const rawEvent = { event: 'custom_event', properties: { - time: 1746097970, + time: 1_746_097_970, distinct_id: '$device:123', $device_id: '123', $user_id: 'user123', @@ -192,7 +192,7 @@ describe('mixpanel', () => { const rawEvent = { event: 'ec_search_error', properties: { - time: 1759947367, + time: 1_759_947_367, distinct_id: '3385916', $browser: 'Mobile Safari', $browser_version: null, @@ -207,7 +207,7 @@ describe('mixpanel', () => { $insert_id: 'bclkaepeqcfuzt4v', $lib_version: '2.60.0', $mp_api_endpoint: 'api-js.mixpanel.com', - $mp_api_timestamp_ms: 1759927570699, + $mp_api_timestamp_ms: 1_759_927_570_699, $os: 'iOS', $region: 'Karnataka', $screen_height: 852, @@ -225,7 +225,7 @@ describe('mixpanel', () => { language: 'english', mp_country_code: 'IN', mp_lib: 'web', - mp_processing_time_ms: 1759927592421, + mp_processing_time_ms: 1_759_927_592_421, mp_sent_by_lib_version: '2.60.0', os: 'web', osVersion: @@ -249,15 +249,15 @@ describe('mixpanel', () => { expect(res.id.length).toBeGreaterThan(30); expect(res.imported_at).toMatch( - /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/, + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/ ); expect(omit(['id', 'imported_at'], res)).toEqual({ brand: 'Apple', browser: 'GSA', - browser_version: 'null', + browser_version: '388.0.811331708', city: 'Bengaluru', country: 'IN', - created_at: '2025-10-08T18:16:07.000Z', + created_at: '2025-10-08 18:16:07', device: 'mobile', device_id: '199b498af1036c-0e943279a1292e-5c0f4368-51bf4-199b498af1036c', duration: 0, diff --git a/packages/importer/src/providers/mixpanel.ts b/packages/importer/src/providers/mixpanel.ts index 42e41c0a..c3676b4e 100644 --- a/packages/importer/src/providers/mixpanel.ts +++ b/packages/importer/src/providers/mixpanel.ts @@ -1,8 +1,13 @@ import { randomUUID } from 'node:crypto'; import { isSameDomain, parsePath, toDots } from '@openpanel/common'; -import { type UserAgentInfo, parseUserAgent } from '@openpanel/common/server'; -import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; -import type { IClickhouseEvent } from '@openpanel/db'; +import { + getReferrerWithQuery, + parseReferrer, + parseUserAgent, + type UserAgentInfo, +} from '@openpanel/common/server'; +import { formatClickhouseDate, type IClickhouseEvent } from '@openpanel/db'; +import type { IClickhouseProfile } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { IMixpanelImportConfig } from '@openpanel/validation'; import { z } from 'zod'; @@ -15,22 +20,88 @@ export const zMixpanelRawEvent = z.object({ export type MixpanelRawEvent = z.infer; +/** Engage API profile: https://docs.mixpanel.com/docs/export-methods#exporting-profiles */ +export const zMixpanelRawProfile = z.object({ + $distinct_id: z.union([z.string(), z.number()]), + $properties: z.record(z.unknown()).optional().default({}), +}); +export type MixpanelRawProfile = z.infer; + +class MixpanelRateLimitError extends Error { + readonly retryAfterMs?: number; + + constructor(message: string, retryAfterMs?: number) { + super(message); + this.name = 'MixpanelRateLimitError'; + this.retryAfterMs = retryAfterMs; + } +} + export class MixpanelProvider extends BaseImportProvider { provider = 'mixpanel'; version = '1.0.0'; + private static readonly MAX_REQUESTS_PER_HOUR = 100; + private static readonly MIN_REQUEST_INTERVAL_MS = 334; // 3 QPS limit + private requestTimestamps: number[] = []; + private lastRequestTime = 0; + constructor( private readonly projectId: string, private readonly config: IMixpanelImportConfig, - private readonly logger?: ILogger, + private readonly logger?: ILogger ) { super(); } - async getTotalEventsCount(): Promise { + private async waitForRateLimit(): Promise { + const now = Date.now(); + const oneHourAgo = now - 60 * 60 * 1000; + + // Prune timestamps older than 1 hour + this.requestTimestamps = this.requestTimestamps.filter( + (t) => t > oneHourAgo + ); + + // Enforce per-second limit (3 QPS → min 334ms gap) + const timeSinceLast = now - this.lastRequestTime; + if (timeSinceLast < MixpanelProvider.MIN_REQUEST_INTERVAL_MS) { + const delay = MixpanelProvider.MIN_REQUEST_INTERVAL_MS - timeSinceLast; + await new Promise((resolve) => setTimeout(resolve, delay)); + } + + // Enforce hourly limit + if ( + this.requestTimestamps.length >= MixpanelProvider.MAX_REQUESTS_PER_HOUR + ) { + const oldestInWindow = this.requestTimestamps[0]!; + const waitUntil = oldestInWindow + 60 * 60 * 1000; + const waitMs = waitUntil - Date.now() + 1000; // +1s buffer + + if (waitMs > 0) { + this.logger?.info( + `Rate limit: ${this.requestTimestamps.length} requests in the last hour, waiting ${Math.ceil(waitMs / 1000)}s`, + { + requestsInWindow: this.requestTimestamps.length, + waitMs, + } + ); + await new Promise((resolve) => setTimeout(resolve, waitMs)); + // Prune again after waiting + this.requestTimestamps = this.requestTimestamps.filter( + (t) => t > Date.now() - 60 * 60 * 1000 + ); + } + } + + this.lastRequestTime = Date.now(); + this.requestTimestamps.push(Date.now()); + } + + getTotalEventsCount(): Promise { // Mixpanel sucks and dont provide a good way to extract total event count within a period // jql would work but not accurate and will be deprecated end of 2025 - return -1; + return Promise.resolve(-1); } /** @@ -42,13 +113,13 @@ export class MixpanelProvider extends BaseImportProvider { } async *parseSource( - overrideFrom?: string, + overrideFrom?: string ): AsyncGenerator { yield* this.fetchEventsFromMixpanel(overrideFrom); } private async *fetchEventsFromMixpanel( - overrideFrom?: string, + overrideFrom?: string ): AsyncGenerator { const { serviceAccount, serviceSecret, projectId, from, to } = this.config; @@ -58,20 +129,24 @@ export class MixpanelProvider extends BaseImportProvider { for (const [chunkFrom, chunkTo] of dateChunks) { let retries = 0; - const maxRetries = 3; + const maxRetries = 6; while (retries <= maxRetries) { try { + await this.waitForRateLimit(); yield* this.fetchEventsForDateRange( serviceAccount, serviceSecret, projectId, chunkFrom, - chunkTo, + chunkTo ); break; // Success, move to next chunk } catch (error) { retries++; + const isRateLimit = + error instanceof MixpanelRateLimitError || + (error instanceof Error && error.message.includes('429')); const isLastRetry = retries > maxRetries; this.logger?.warn('Failed to fetch events for date range', { @@ -80,22 +155,31 @@ export class MixpanelProvider extends BaseImportProvider { attempt: retries, maxRetries, error: (error as Error).message, + isRateLimit, willRetry: !isLastRetry, }); if (isLastRetry) { - // Final attempt failed, re-throw throw new Error( - `Failed to fetch Mixpanel events for ${chunkFrom} to ${chunkTo} after ${maxRetries} retries: ${(error as Error).message}`, + `Failed to fetch Mixpanel events for ${chunkFrom} to ${chunkTo} after ${maxRetries} retries: ${(error as Error).message}` ); } - // Exponential backoff: wait before retrying - const delay = Math.min(1000 * 2 ** (retries - 1), 60_000); // Cap at 1 minute + let delay: number; + if (error instanceof MixpanelRateLimitError && error.retryAfterMs) { + delay = error.retryAfterMs; + } else if (isRateLimit) { + // 5min → 10min → 15min → 15min → 15min = 60min total + delay = Math.min(300_000 * 2 ** (retries - 1), 900_000); + } else { + delay = Math.min(1000 * 2 ** (retries - 1), 60_000); + } + this.logger?.info('Retrying after delay', { delayMs: delay, chunkFrom, chunkTo, + isRateLimit, }); await new Promise((resolve) => setTimeout(resolve, delay)); } @@ -108,7 +192,7 @@ export class MixpanelProvider extends BaseImportProvider { serviceSecret: string, projectId: string, from: string, - to: string, + to: string ): AsyncGenerator { const url = 'https://data.mixpanel.com/api/2.0/export'; @@ -134,9 +218,18 @@ export class MixpanelProvider extends BaseImportProvider { }, }); + if (response.status === 429) { + const retryAfter = response.headers.get('Retry-After'); + const retryAfterMs = retryAfter ? Number(retryAfter) * 1000 : undefined; + throw new MixpanelRateLimitError( + 'Mixpanel rate limit exceeded (429)', + retryAfterMs + ); + } + if (!response.ok) { throw new Error( - `Failed to fetch events from Mixpanel: ${response.status} ${response.statusText}`, + `Failed to fetch events from Mixpanel: ${response.status} ${response.statusText}` ); } @@ -153,7 +246,9 @@ export class MixpanelProvider extends BaseImportProvider { while (true) { const { done, value } = await reader.read(); - if (done) break; + if (done) { + break; + } buffer += decoder.decode(value, { stream: true }); @@ -187,7 +282,7 @@ export class MixpanelProvider extends BaseImportProvider { { line: buffer.substring(0, 100), error, - }, + } ); } } @@ -196,6 +291,114 @@ export class MixpanelProvider extends BaseImportProvider { } } + /** + * Stream user profiles from Mixpanel Engage API. + * Paginates with page/page_size (5k per page) and yields each profile. + */ + async *streamProfiles(): AsyncGenerator { + const { serviceAccount, serviceSecret, projectId } = this.config; + const pageSize = 5000; + let page = 0; + + while (true) { + await this.waitForRateLimit(); + + const url = `https://mixpanel.com/api/query/engage?project_id=${encodeURIComponent(projectId)}`; + const body = new URLSearchParams({ + page: String(page), + page_size: String(pageSize), + }); + + this.logger?.info('Fetching profiles from Mixpanel Engage', { + page, + page_size: pageSize, + projectId, + }); + + const response = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Basic ${Buffer.from(`${serviceAccount}:${serviceSecret}`).toString('base64')}`, + Accept: 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: body.toString(), + }); + + if (response.status === 429) { + const retryAfter = response.headers.get('Retry-After'); + const retryAfterMs = retryAfter ? Number(retryAfter) * 1000 : undefined; + throw new MixpanelRateLimitError( + 'Mixpanel rate limit exceeded (429)', + retryAfterMs + ); + } + + if (!response.ok) { + const text = await response.text(); + throw new Error( + `Failed to fetch profiles from Mixpanel: ${response.status} ${response.statusText} - ${text}` + ); + } + + const data = (await response.json()) as { + results?: Array<{ $distinct_id: string | number; $properties?: Record }>; + page?: number; + total?: number; + }; + + const results = data.results ?? []; + for (const row of results) { + const parsed = zMixpanelRawProfile.safeParse(row); + if (parsed.success) { + yield parsed.data; + } else { + this.logger?.warn('Skipping invalid Mixpanel profile', { + row: JSON.stringify(row).slice(0, 200), + }); + } + } + + if (results.length < pageSize) { + break; + } + page++; + } + } + + /** + * Map Mixpanel Engage profile to OpenPanel IClickhouseProfile. + */ + transformProfile(raw: MixpanelRawProfile): IClickhouseProfile { + const parsed = zMixpanelRawProfile.parse(raw); + const props = (parsed.$properties || {}) as Record; + + const id = String(parsed.$distinct_id).replace(/^\$device:/, ''); + const createdAt = props.$created + ? formatClickhouseDate(new Date(String(props.$created))) + : formatClickhouseDate(new Date()); + + const properties: Record = {}; + const stripPrefix = /^\$/; + for (const [key, value] of Object.entries(props)) { + if (stripPrefix.test(key)) continue; + if (value == null) continue; + properties[key] = typeof value === 'object' ? JSON.stringify(value) : String(value); + } + + return { + id, + project_id: this.projectId, + first_name: String(props.$first_name ?? ''), + last_name: String(props.$last_name ?? ''), + email: String(props.$email ?? ''), + avatar: String(props.$avatar ?? props.$image ?? ''), + properties, + created_at: createdAt, + is_external: true, + }; + } + validate(rawEvent: MixpanelRawEvent): boolean { const res = zMixpanelRawEvent.safeParse(rawEvent); return res.success; @@ -208,7 +411,7 @@ export class MixpanelProvider extends BaseImportProvider { const deviceId = props.$device_id; const profileId = String(props.$user_id || props.distinct_id).replace( /^\$device:/, - '', + '' ); // Build full URL from current_url and current_url_search (web only) @@ -309,7 +512,7 @@ export class MixpanelProvider extends BaseImportProvider { project_id: projectId, session_id: '', // Will be generated in SQL after import properties: toDots(properties), // Flatten nested objects/arrays to Map(String, String) - created_at: new Date(props.time * 1000).toISOString(), + created_at: formatClickhouseDate(new Date(props.time * 1000)), country, city, region, @@ -318,10 +521,7 @@ export class MixpanelProvider extends BaseImportProvider { os: uaInfo.os || props.$os, os_version: uaInfo.osVersion || props.$osVersion, browser: uaInfo.browser || props.$browser, - browser_version: - uaInfo.browserVersion || props.$browserVersion - ? String(props.$browser_version) - : '', + browser_version: uaInfo.browserVersion || String(props.$browser_version ?? ''), device: this.getDeviceType(props.mp_lib, uaInfo, props), brand: uaInfo.brand || '', model: uaInfo.model || '', @@ -338,14 +538,6 @@ export class MixpanelProvider extends BaseImportProvider { sdk_version: this.version, }; - // TODO: Remove this - // Temporary fix for a client - const isMightBeScreenView = this.getMightBeScreenView(rawEvent); - if (isMightBeScreenView && event.name === 'Loaded a Screen') { - event.name = 'screen_view'; - event.path = isMightBeScreenView; - } - // TODO: Remove this // This is a hack to get utm tags (not sure if this is just the testing project or all mixpanel projects) if (props.utm_source && !properties.__query?.utm_source) { @@ -371,13 +563,13 @@ export class MixpanelProvider extends BaseImportProvider { private getDeviceType( mp_lib: string, uaInfo: UserAgentInfo, - props: Record, + props: Record ) { // Normalize lib/os/browser data const lib = (mp_lib || '').toLowerCase(); const os = String(props.$os || uaInfo.os || '').toLowerCase(); const browser = String( - props.$browser || uaInfo.browser || '', + props.$browser || uaInfo.browser || '' ).toLowerCase(); const isTabletOs = os === 'ipados' || os === 'ipad os' || os === 'ipad'; @@ -431,11 +623,6 @@ export class MixpanelProvider extends BaseImportProvider { return !this.isWebEvent(mp_lib); } - private getMightBeScreenView(rawEvent: MixpanelRawEvent) { - const props = rawEvent.properties as Record; - return Object.keys(props).find((key) => key.match(/^[A-Z1-9_]+$/)); - } - private parseServerDeviceInfo(props: Record): UserAgentInfo { // For mobile events, extract device information from Mixpanel properties const os = props.$os || props.os || ''; @@ -446,19 +633,19 @@ export class MixpanelProvider extends BaseImportProvider { return { isServer: true, - os: os, - osVersion: osVersion, + os, + osVersion, browser: '', browserVersion: '', - device: device, - brand: brand, - model: model, + device, + brand, + model, }; } private stripMixpanelProperties( properties: Record, - searchParams: Record, + searchParams: Record ): Record { const strip = [ 'time', @@ -472,8 +659,8 @@ export class MixpanelProvider extends BaseImportProvider { ]; const filtered = Object.fromEntries( Object.entries(properties).filter( - ([key]) => !key.match(/^(\$|mp_|utm_)/) && !strip.includes(key), - ), + ([key]) => !(key.match(/^(\$|mp_|utm_)/) || strip.includes(key)) + ) ); // Parse JSON strings back to objects/arrays so toDots() can flatten them diff --git a/packages/importer/src/providers/umami.ts b/packages/importer/src/providers/umami.ts index f232e1b8..c6947329 100644 --- a/packages/importer/src/providers/umami.ts +++ b/packages/importer/src/providers/umami.ts @@ -2,10 +2,13 @@ import { randomUUID } from 'node:crypto'; import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { createBrotliDecompress, createGunzip } from 'node:zlib'; -import { isSameDomain, parsePath } from '@openpanel/common'; -import { generateDeviceId } from '@openpanel/common/server'; -import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; -import type { IClickhouseEvent } from '@openpanel/db'; +import { isSameDomain, parsePath, toDots } from '@openpanel/common'; +import { + generateDeviceId, + getReferrerWithQuery, + parseReferrer, +} from '@openpanel/common/server'; +import { formatClickhouseDate, type IClickhouseEvent } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { IUmamiImportConfig } from '@openpanel/validation'; import { parse } from 'csv-parse'; @@ -63,7 +66,7 @@ export class UmamiProvider extends BaseImportProvider { constructor( private readonly projectId: string, private readonly config: IUmamiImportConfig, - private readonly logger?: ILogger, + private readonly logger?: ILogger ) { super(); } @@ -82,7 +85,7 @@ export class UmamiProvider extends BaseImportProvider { signal?: AbortSignal; maxBytes?: number; maxRows?: number; - } = {}, + } = {} ): AsyncGenerator { const { signal, maxBytes, maxRows } = opts; const controller = new AbortController(); @@ -95,9 +98,9 @@ export class UmamiProvider extends BaseImportProvider { } const res = await fetch(url, { signal: controller.signal }); - if (!res.ok || !res.body) { + if (!(res.ok && res.body)) { throw new Error( - `Failed to fetch remote file: ${res.status} ${res.statusText}`, + `Failed to fetch remote file: ${res.status} ${res.statusText}` ); } @@ -108,15 +111,15 @@ export class UmamiProvider extends BaseImportProvider { if ( contentType && !/text\/csv|text\/plain|application\/gzip|application\/octet-stream/i.test( - contentType, + contentType ) ) { - console.warn(`Warning: Content-Type is ${contentType}, expected CSV-ish`); + this.logger?.warn(`Warning: Content-Type is ${contentType}, expected CSV-ish`); } if (maxBytes && contentLen && contentLen > maxBytes) { throw new Error( - `Remote file exceeds size limit (${contentLen} > ${maxBytes})`, + `Remote file exceeds size limit (${contentLen} > ${maxBytes})` ); } @@ -137,9 +140,7 @@ export class UmamiProvider extends BaseImportProvider { if (seenBytes > maxBytes) { controller.abort(); body.destroy( - new Error( - `Stream exceeded size limit (${seenBytes} > ${maxBytes})`, - ), + new Error(`Stream exceeded size limit (${seenBytes} > ${maxBytes})`) ); } }); @@ -190,7 +191,7 @@ export class UmamiProvider extends BaseImportProvider { throw new Error( `Failed to parse remote file from ${url}: ${ err instanceof Error ? err.message : String(err) - }`, + }` ); } finally { controller.abort(); // ensure fetch stream is torn down @@ -205,7 +206,7 @@ export class UmamiProvider extends BaseImportProvider { transformEvent(_rawEvent: UmamiRawEvent): IClickhouseEvent { const projectId = this.config.projectMapper.find( - (mapper) => mapper.from === _rawEvent.website_id, + (mapper) => mapper.from === _rawEvent.website_id )?.to || this.projectId; const rawEvent = zUmamiRawEvent.parse(_rawEvent); @@ -261,39 +262,50 @@ export class UmamiProvider extends BaseImportProvider { } // Add useful properties from Umami data - if (rawEvent.page_title) properties.__title = rawEvent.page_title; - if (rawEvent.screen) properties.__screen = rawEvent.screen; - if (rawEvent.language) properties.__language = rawEvent.language; - if (rawEvent.utm_source) + if (rawEvent.page_title) { + properties.__title = rawEvent.page_title; + } + if (rawEvent.screen) { + properties.__screen = rawEvent.screen; + } + if (rawEvent.language) { + properties.__language = rawEvent.language; + } + if (rawEvent.utm_source) { properties = assocPath( ['__query', 'utm_source'], rawEvent.utm_source, - properties, + properties ); - if (rawEvent.utm_medium) + } + if (rawEvent.utm_medium) { properties = assocPath( ['__query', 'utm_medium'], rawEvent.utm_medium, - properties, + properties ); - if (rawEvent.utm_campaign) + } + if (rawEvent.utm_campaign) { properties = assocPath( ['__query', 'utm_campaign'], rawEvent.utm_campaign, - properties, + properties ); - if (rawEvent.utm_content) + } + if (rawEvent.utm_content) { properties = assocPath( ['__query', 'utm_content'], rawEvent.utm_content, - properties, + properties ); - if (rawEvent.utm_term) + } + if (rawEvent.utm_term) { properties = assocPath( ['__query', 'utm_term'], rawEvent.utm_term, - properties, + properties ); + } return { id: rawEvent.event_id || randomUUID(), @@ -302,8 +314,8 @@ export class UmamiProvider extends BaseImportProvider { profile_id: profileId, project_id: projectId, session_id: rawEvent.session_id || '', - properties, - created_at: rawEvent.created_at.toISOString(), + properties: toDots(properties), + created_at: formatClickhouseDate(rawEvent.created_at), country, city, region: this.mapRegion(region), @@ -329,7 +341,7 @@ export class UmamiProvider extends BaseImportProvider { } mapRegion(region: string): string { - return region.replace(/^[A-Z]{2}\-/, ''); + return region.replace(/^[A-Z]{2}-/, ''); } mapDevice(device: string): string {