From 65c464a63c6fd67c292c55bb0925154db315c9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 23 Jul 2024 22:58:00 +0200 Subject: [PATCH] update importer script --- packages/cli/src/importer/importer.ts | 103 +++++++++++++++++++++----- packages/cli/src/importer/index.ts | 40 +++++++++- 2 files changed, 121 insertions(+), 22 deletions(-) diff --git a/packages/cli/src/importer/importer.ts b/packages/cli/src/importer/importer.ts index 0fe61ae2..b8362965 100644 --- a/packages/cli/src/importer/importer.ts +++ b/packages/cli/src/importer/importer.ts @@ -1,15 +1,16 @@ import { randomUUID } from 'crypto'; import fs from 'fs'; import readline from 'readline'; -import { glob } from 'glob'; +import zlib from 'zlib'; import Progress from 'progress'; import { assocPath, prop, uniqBy } from 'ramda'; -import { parsePath } from '@openpanel/common'; +import { isSameDomain, parsePath } from '@openpanel/common'; import type { IImportedEvent } from '@openpanel/db'; -const BATCH_SIZE = 1000; +const BATCH_SIZE = 30_000; const SLEEP_TIME = 20; +const MAX_CONCURRENT_REQUESTS = 8; type IMixpanelEvent = { event: string; @@ -179,6 +180,21 @@ function generateSessionEvents(events: IImportedEvent[]): Session[] { } function createEventObject(event: IMixpanelEvent): IImportedEvent { + const getReferrer = (referrer: string | undefined) => { + if (!referrer) { + return ''; + } + + if (referrer === '$direct') { + return ''; + } + + if (isSameDomain(referrer, event.properties.$current_url)) { + return ''; + } + + return referrer; + }; const url = parsePath(event.properties.$current_url); return { profile_id: event.properties.distinct_id @@ -203,7 +219,7 @@ function createEventObject(event: IMixpanelEvent): IImportedEvent { browser_version: event.properties.$browser_version ? String(event.properties.$browser_version) : '', - referrer: event.properties.$initial_referrer ?? '', + referrer: getReferrer(event.properties.$initial_referrer), referrer_type: event.properties.$search_engine ? 'search' : '', referrer_name: event.properties.$search_engine ?? '', device_id: event.properties.$device_id ?? '', @@ -295,17 +311,33 @@ function processEvents(events: IImportedEvent[]): IImportedEvent[] { ]; } -async function sendBatchToAPI(batch: IImportedEvent[]) { +async function sendBatchToAPI( + batch: IImportedEvent[], + { + apiUrl, + clientId, + clientSecret, + }: { + apiUrl: string; + clientId: string; + clientSecret: string; + } +) { try { - const res = await fetch('http://localhost:3333/import/events', { + const res = await fetch(`${apiUrl}/import/events`, { method: 'POST', headers: { + 'Content-Encoding': 'gzip', 'Content-Type': 'application/json', - 'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585', - 'openpanel-client-secret': 'sec_293b903816e327e10c9d', + 'openpanel-client-id': clientId, + 'openpanel-client-secret': clientSecret, }, - body: JSON.stringify(batch), + body: zlib.gzipSync(JSON.stringify(batch)), }); + if (!res.ok) { + console.log('Failed to send batch to API'); + console.log(await res.text()); + } await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME)); } catch (e) { console.log('sendBatchToAPI failed'); @@ -313,7 +345,17 @@ async function sendBatchToAPI(batch: IImportedEvent[]) { } } -async function processFiles(files: string[]) { +async function processFiles({ + files, + apiUrl, + clientId, + clientSecret, +}: { + files: string[]; + apiUrl: string; + clientId: string; + clientSecret: string; +}) { const progress = new Progress( 'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status', { @@ -347,34 +389,57 @@ async function processFiles(files: string[]) { currentBatch = []; } - if (apiBatching.length >= 10) { - await Promise.all(apiBatching.map(sendBatchToAPI)); + if (apiBatching.length >= MAX_CONCURRENT_REQUESTS) { + await Promise.all( + apiBatching.map((batch) => + sendBatchToAPI(batch, { + apiUrl, + clientId, + clientSecret, + }) + ) + ); apiBatching = []; } } } if (currentBatch.length > 0) { - await sendBatchToAPI(currentBatch); + await sendBatchToAPI(currentBatch, { + apiUrl, + clientId, + clientSecret, + }); savedEvents += currentBatch.length; progress.render({ file: 'Complete', savedEvents, status: 'Complete' }); } } -export async function importFiles(matcher: string) { - const files = await glob([matcher], { root: '/' }); - +export async function importFiles({ + files, + apiUrl, + clientId, + clientSecret, +}: { + files: string[]; + apiUrl: string; + clientId: string; + clientSecret: string; +}) { if (files.length === 0) { console.log('No files found'); return; } - files.sort((a, b) => a.localeCompare(b)); - console.log(`Found ${files.length} files to process`); const startTime = Date.now(); - await processFiles(files); + await processFiles({ + files, + apiUrl, + clientId, + clientSecret, + }); const endTime = Date.now(); console.log( diff --git a/packages/cli/src/importer/index.ts b/packages/cli/src/importer/index.ts index ea1ef9ac..3cc88e1f 100644 --- a/packages/cli/src/importer/index.ts +++ b/packages/cli/src/importer/index.ts @@ -1,12 +1,19 @@ import path from 'path'; import arg from 'arg'; +import { glob } from 'glob'; import { importFiles } from './importer'; -export default function importer() { +export default async function importer() { const args = arg( { '--glob': String, + '--api-url': String, + '--client-id': String, + '--client-secret': String, + '--dry-run': Boolean, + '--from': Number, + '--to': Number, }, { permissive: true, @@ -17,9 +24,36 @@ export default function importer() { throw new Error('Missing --glob argument'); } + if (!args['--client-id']) { + throw new Error('Missing --client-id argument'); + } + + if (!args['--client-secret']) { + throw new Error('Missing --client-secret argument'); + } + const cwd = process.cwd(); - const filePath = path.resolve(cwd, args['--glob']); + const fileMatcher = path.resolve(cwd, args['--glob']); + const allFiles = await glob([fileMatcher], { root: '/' }); + allFiles.sort((a, b) => a.localeCompare(b)); - return importFiles(filePath); + const files = allFiles.slice( + args['--from'] ?? 0, + args['--to'] ?? Number.MAX_SAFE_INTEGER + ); + + if (args['--dry-run']) { + files.forEach((file, index) => { + console.log(`Would import (index: ${index}): ${file}`); + }); + return; + } + + return importFiles({ + files, + clientId: args['--client-id'], + clientSecret: args['--client-secret'], + apiUrl: args['--api-url'] ?? 'https://api.openpanel.dev', + }); }