From bf0c14cc88a70ffdaa59eb40d270b910de8d1cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 15 Jul 2024 17:58:48 +0200 Subject: [PATCH] wip --- packages/cli/package.json | 3 + packages/cli/src/cli.ts | 2 - .../cli/src/importer/{load.ts => importer.ts} | 103 ++---- packages/cli/src/importer/importer_v2.ts | 339 ++++++++++++++++++ packages/cli/src/importer/index.ts | 129 +------ pnpm-lock.yaml | 27 ++ 6 files changed, 412 insertions(+), 191 deletions(-) rename packages/cli/src/importer/{load.ts => importer.ts} (89%) create mode 100644 packages/cli/src/importer/importer_v2.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index fb6aa4b1..bebdb364 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -15,6 +15,8 @@ "arg": "^5.0.2", "glob": "^10.4.3", "inquirer": "^9.3.5", + "p-limit": "^6.1.0", + "progress": "^2.0.3", "ramda": "^0.29.1", "zod": "^3.22.4" }, @@ -25,6 +27,7 @@ "@openpanel/sdk": "workspace:*", "@openpanel/tsconfig": "workspace:*", "@types/node": "^20.14.10", + "@types/progress": "^2.0.7", "@types/ramda": "^0.30.1", "eslint": "^8.48.0", "prettier": "^3.0.3", diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 23d70f30..26d6c7d6 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -12,8 +12,6 @@ function cli() { } ); - console.log('cli args', args); - const [command] = args._; switch (command) { diff --git a/packages/cli/src/importer/load.ts b/packages/cli/src/importer/importer.ts similarity index 89% rename from packages/cli/src/importer/load.ts rename to packages/cli/src/importer/importer.ts index 064c71a6..3ce52736 100644 --- a/packages/cli/src/importer/load.ts +++ b/packages/cli/src/importer/importer.ts @@ -1,9 +1,7 @@ import { randomUUID } from 'crypto'; import fs from 'fs'; -import os from 'os'; -import v8 from 'v8'; import { glob } from 'glob'; -import { assocPath, clone, last, prop, uniqBy } from 'ramda'; +import { assocPath, clone, prop, uniqBy } from 'ramda'; import type { IClickhouseEvent } from '@openpanel/db'; @@ -12,29 +10,6 @@ import { parsePath } from './copy.url'; const BATCH_SIZE = 8000; // Define your batch size const SLEEP_TIME = 100; // Define your sleep time between batches -function checkNodeMemoryLimit() { - const totalHeapSize = v8.getHeapStatistics().total_available_size; - return totalHeapSize; -} - -function checkMemoryUsage() { - const _totalMemory = os.totalmem(); - const totalMemory = checkNodeMemoryLimit(); - console.log({ - totalMemory, - _totalMemory, - }); - - const usedMemory = process.memoryUsage().heapUsed; - const percentUsed = (usedMemory / totalMemory) * 100; - return percentUsed; -} - -function shouldLoadMoreFiles() { - const percentUsed = checkMemoryUsage(); - return percentUsed < 80; // or any other threshold you deem appropriate -} - function progress(value: string) { process.stdout.clearLine(0); process.stdout.cursorTo(0); @@ -204,37 +179,6 @@ function generateSessionId(): string { return randomUUID(); } -export async function loadFilesBatcher() { - const files = await glob(['../../../../Downloads/mp-data/*.txt'], { - root: '/', - }); - - function chunks(array: string[], size: number) { - const results = []; - while (array.length) { - results.push(array.splice(0, size)); - } - return results; - } - - const times = []; - const chunksArray = chunks(files, 5); - let chunkIndex = 0; - for (const chunk of chunksArray.slice(0, 1)) { - if (times.length > 0) { - // Print out how much time is approximately left - const average = times.reduce((a, b) => a + b) / times.length; - const remaining = (chunksArray.length - chunkIndex) * average; - console.log(`Estimated time left: ${remaining / 1000 / 60} minutes`); - } - console.log('Processing chunk:', chunkIndex); - chunkIndex++; - const d = Date.now(); - await loadFiles(chunk); - times.push(Date.now() - d); - } -} - async function loadFiles(files: string[] = []) { const data: any[] = []; const filesToParse = files.slice(0, 10); @@ -245,7 +189,6 @@ async function loadFiles(files: string[] = []) { const content: any[] = []; readStream.on('data', (chunk) => { - // console.log(`Received ${chunk.length} bytes of data.`); content.push(chunk.toString('utf-8')); }); @@ -324,13 +267,6 @@ async function loadFiles(files: string[] = []) { const sessions = generateSessionEvents(a); const events = sessions.flatMap((session) => { - if ( - session.profileId === '594447' || - session.deviceId === - '19081f09f2d666-082ba152fdf7548-7f7a3660-5a900-19081f09f2d666' - ) { - console.log(session); - } return [ session.firstEvent && { ...session.firstEvent, @@ -407,7 +343,6 @@ async function loadFiles(files: string[] = []) { await Promise.all(promises); } } - console.log(totalPages); // Trigger the batches try { @@ -417,4 +352,38 @@ async function loadFiles(files: string[] = []) { } } -loadFilesBatcher(); +export async function importFiles(matcher: string) { + const files = await glob([matcher], { + root: '/', + }); + + if (files.length === 0) { + console.log('No files found'); + return; + } + + function chunks(array: string[], size: number) { + const results = []; + while (array.length) { + results.push(array.splice(0, size)); + } + return results; + } + + const times = []; + const chunksArray = chunks(files, 3); + let chunkIndex = 0; + for (const chunk of chunksArray) { + if (times.length > 0) { + // Print out how much time is approximately left + const average = times.reduce((a, b) => a + b) / times.length; + const remaining = (chunksArray.length - chunkIndex) * average; + console.log(`\n\nEstimated time left: ${remaining / 1000 / 60} minutes`); + } + console.log('Processing chunk:', chunkIndex); + chunkIndex++; + const d = Date.now(); + await loadFiles(chunk); + times.push(Date.now() - d); + } +} diff --git a/packages/cli/src/importer/importer_v2.ts b/packages/cli/src/importer/importer_v2.ts new file mode 100644 index 00000000..e9dc05cb --- /dev/null +++ b/packages/cli/src/importer/importer_v2.ts @@ -0,0 +1,339 @@ +import { randomUUID } from 'crypto'; +import fs from 'fs'; +import readline from 'readline'; +import { glob } from 'glob'; +import Progress from 'progress'; +import { assocPath, prop, uniqBy } from 'ramda'; + +import type { IClickhouseEvent } from '@openpanel/db'; + +import { parsePath } from './copy.url'; + +const BATCH_SIZE = 8000; +const SLEEP_TIME = 100; + +function stripMixpanelProperties(obj: Record) { + return Object.fromEntries( + Object.entries(obj).filter( + ([key]) => + !key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key) + ) + ); +} + +async function* parseJsonStream( + fileStream: fs.ReadStream +): AsyncGenerator { + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let buffer = ''; + let bracketCount = 0; + + for await (const line of rl) { + buffer += line; + bracketCount += + (line.match(/{/g) || []).length - (line.match(/}/g) || []).length; + + if (bracketCount === 0 && buffer.trim()) { + try { + const json = JSON.parse(buffer); + yield json; + } catch (error) { + console.log('Warning: Failed to parse JSON'); + console.log('Buffer:', buffer); + } + buffer = ''; + } + } + + if (buffer.trim()) { + try { + const json = JSON.parse(buffer); + yield json; + } catch (error) { + console.log('Warning: Failed to parse remaining JSON'); + console.log('Buffer:', buffer); + } + } +} + +interface Session { + start: number; + end: number; + profileId?: string; + deviceId?: string; + sessionId: string; + firstEvent?: IClickhouseEvent; + lastEvent?: IClickhouseEvent; + events: IClickhouseEvent[]; +} + +function generateSessionEvents(events: IClickhouseEvent[]): Session[] { + let sessionList: Session[] = []; + const lastSessionByDevice: Record = {}; + const lastSessionByProfile: Record = {}; + const thirtyMinutes = 30 * 60 * 1000; + + events.sort( + (a, b) => + new Date(a.created_at).getTime() - new Date(b.created_at).getTime() + ); + + for (const event of events) { + const eventTime = new Date(event.created_at).getTime(); + let deviceSession = event.device_id + ? lastSessionByDevice[event.device_id] + : undefined; + let profileSession = event.profile_id + ? lastSessionByProfile[event.profile_id] + : undefined; + + if ( + event.device_id && + event.device_id !== event.profile_id && + (!deviceSession || eventTime > deviceSession.end + thirtyMinutes) + ) { + deviceSession = { + start: eventTime, + end: eventTime, + deviceId: event.device_id, + sessionId: randomUUID(), + firstEvent: event, + events: [event], + }; + lastSessionByDevice[event.device_id] = deviceSession; + sessionList.push(deviceSession); + } else if (deviceSession) { + deviceSession.end = eventTime; + deviceSession.lastEvent = event; + deviceSession.events.push(event); + } + + if ( + event.profile_id && + event.device_id !== event.profile_id && + (!profileSession || eventTime > profileSession.end + thirtyMinutes) + ) { + profileSession = { + start: eventTime, + end: eventTime, + profileId: event.profile_id, + sessionId: randomUUID(), + firstEvent: event, + events: [event], + }; + lastSessionByProfile[event.profile_id] = profileSession; + sessionList.push(profileSession); + } else if (profileSession) { + profileSession.end = eventTime; + profileSession.lastEvent = event; + profileSession.events.push(event); + } + + if ( + deviceSession && + profileSession && + deviceSession.sessionId !== profileSession.sessionId + ) { + const unifiedSession = { + ...deviceSession, + ...profileSession, + events: [...deviceSession.events, ...profileSession.events], + start: Math.min(deviceSession.start, profileSession.start), + end: Math.max(deviceSession.end, profileSession.end), + sessionId: deviceSession.sessionId, + }; + lastSessionByDevice[event.device_id] = unifiedSession; + lastSessionByProfile[event.profile_id] = unifiedSession; + sessionList = sessionList.filter( + (session) => + session.sessionId !== deviceSession?.sessionId && + session.sessionId !== profileSession?.sessionId + ); + sessionList.push(unifiedSession); + } + } + + return sessionList; +} + +function createEventObject(event: any): IClickhouseEvent { + const url = parsePath(event.properties.$current_url); + return { + profile_id: event.properties.distinct_id + ? String(event.properties.distinct_id).replace(/^\$device:/, '') + : event.properties.$device_id ?? '', + name: event.event, + created_at: new Date(event.properties.time * 1000).toISOString(), + properties: { + ...stripMixpanelProperties(event.properties), + ...(event.properties.$current_url + ? { + __query: url.query, + __hash: url.hash, + } + : {}), + }, + country: event.properties.country_code ?? '', + region: event.properties.$region ?? '', + city: event.properties.$city ?? '', + os: event.properties.$os ?? '', + browser: event.properties.$browser ?? '', + browser_version: event.properties.$browser_version + ? String(event.properties.$browser_version) + : '', + referrer: event.properties.$initial_referrer ?? '', + referrer_type: event.properties.$search_engine ? 'search' : '', + referrer_name: event.properties.$search_engine ?? '', + device_id: event.properties.$device_id ?? '', + session_id: '', + project_id: '', + path: url.path, + origin: url.origin, + os_version: '', + model: '', + longitude: null, + latitude: null, + id: randomUUID(), + duration: 0, + device: event.properties.$current_url ? '' : 'server', + brand: '', + }; +} + +async function processFile(file: string): Promise { + const fileStream = fs.createReadStream(file); + const events: IClickhouseEvent[] = []; + for await (const event of parseJsonStream(fileStream)) { + if (Array.isArray(event)) { + for (const item of event) { + events.push(createEventObject(item)); + } + } else { + events.push(createEventObject(event)); + } + } + return events; +} + +function processEvents(events: IClickhouseEvent[]): IClickhouseEvent[] { + const sessions = generateSessionEvents(events); + const processedEvents = sessions.flatMap((session) => + [ + session.firstEvent && { + ...session.firstEvent, + id: randomUUID(), + created_at: new Date( + new Date(session.firstEvent.created_at).getTime() - 1000 + ).toISOString(), + session_id: session.sessionId, + name: 'session_start', + }, + ...uniqBy( + prop('id'), + session.events.map((event) => + assocPath(['session_id'], session.sessionId, event) + ) + ), + session.lastEvent && { + ...session.lastEvent, + id: randomUUID(), + created_at: new Date( + new Date(session.lastEvent.created_at).getTime() + 1000 + ).toISOString(), + session_id: session.sessionId, + name: 'session_end', + }, + ].filter((item): item is IClickhouseEvent => !!item) + ); + + return [ + ...processedEvents, + ...events.filter((event) => { + return !event.profile_id && !event.device_id; + }), + ]; +} + +async function sendBatchToAPI(batch: IClickhouseEvent[]) { + await fetch('http://localhost:3333/import/events', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585', + 'openpanel-client-secret': 'sec_293b903816e327e10c9d', + }, + body: JSON.stringify(batch), + }); + await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME)); +} + +async function processFiles(files: string[]) { + const progress = new Progress( + 'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status', + { + total: files.length, + width: 20, + } + ); + let savedEvents = 0; + let currentBatch: IClickhouseEvent[] = []; + + let apiBatching = []; + for (const file of files) { + progress.tick({ + file, + savedEvents, + status: 'reading file', + }); + const events = await processFile(file); + progress.render({ + file, + savedEvents, + status: 'processing events', + }); + const processedEvents = processEvents(events); + for (const event of processedEvents) { + currentBatch.push(event); + if (currentBatch.length >= BATCH_SIZE) { + apiBatching.push(currentBatch); + savedEvents += currentBatch.length; + progress.render({ file, savedEvents, status: 'saving events' }); + currentBatch = []; + } + + if (apiBatching.length >= 10) { + await Promise.all(apiBatching.map(sendBatchToAPI)); + apiBatching = []; + } + } + } + + if (currentBatch.length > 0) { + await sendBatchToAPI(currentBatch); + savedEvents += currentBatch.length; + progress.render({ file: 'Complete', savedEvents, status: 'Complete' }); + } +} + +export async function importFiles(matcher: string) { + const files = await glob([matcher], { root: '/' }); + + if (files.length === 0) { + console.log('No files found'); + return; + } + + console.log(`Found ${files.length} files to process`); + + const startTime = Date.now(); + await processFiles(files); + const endTime = Date.now(); + + console.log( + `\nProcessing completed in ${(endTime - startTime) / 1000} seconds` + ); +} diff --git a/packages/cli/src/importer/index.ts b/packages/cli/src/importer/index.ts index 3fe6238a..9081bdfd 100644 --- a/packages/cli/src/importer/index.ts +++ b/packages/cli/src/importer/index.ts @@ -5,6 +5,8 @@ import { groupBy } from 'ramda'; import type { PostEventPayload } from '@openpanel/sdk'; +import { importFiles } from './importer_v2'; + const BATCH_SIZE = 10000; // Define your batch size const SLEEP_TIME = 100; // Define your sleep time between batches @@ -63,137 +65,20 @@ function parseFileContent(fileContent: string): { export default function importer() { const args = arg( { - '--file': String, + '--glob': String, }, { permissive: true, } ); - if (!args['--file']) { - throw new Error('Missing --file argument'); + if (!args['--glob']) { + throw new Error('Missing --glob argument'); } const cwd = process.cwd(); - const filePath = path.resolve(cwd, args['--file']); - const fileContent = parseFileContent(fs.readFileSync(filePath, 'utf-8')); + const filePath = path.resolve(cwd, args['--glob']); - // const groups = groupBy((event) => event.properties.$device_id, fileContent); - // const groupEntries = Object.entries(groups); - - // const profiles = new Map(); - - // for (const [deviceId, items] of Object.entries(groups)) { - // items.forEach((item) => { - // if (item.properties.distinct_id) { - // if (!profiles.has(item.properties.distinct_id)) { - // profiles.set(item.properties.distinct_id, []); - // } - // profiles.get(item.properties.distinct_id)!.push(item); - // } else { - // item.properties.$device_id - // } - // }) - // profiles. - // } - // console.log('Total:', groupEntries.length); - // console.log('Undefined:', groups.undefined?.length ?? 0); - - // const uniqueKeys = new Set(); - // groups.undefined.forEach((event) => { - // if (event.properties.distinct_id) { - // console.log(event); - // } - // }); - - // 1: group by device id - // 2: add session start, session end and populate session_id - // 3: check if distinct_id exists on any event - // - If it does, get all events with that distinct_id and NO device_id and within session_start and session_end - // - add add the session_id to those events - // 4: send all events to the server - - const events: PostEventPayload[] = fileContent - .slice() - .reverse() - .map((event) => { - if (event.properties.mp_lib === 'web') { - console.log(event); - } - return { - profileId: event.properties.distinct_id - ? String(event.properties.distinct_id) - : undefined, - name: event.event, - timestamp: new Date(event.properties.time * 1000).toISOString(), - properties: { - __country: event.properties.country_code, - __region: event.properties.$region, - __city: event.properties.$city, - __os: event.properties.$os, - __browser: event.properties.$browser, - __browser_version: event.properties.$browser_version, - __referrer: event.properties.$referrer, - __device_id: event.properties.$device_id, - }, - }; - }); - - const totalPages = Math.ceil(events.length / BATCH_SIZE); - const estimatedTime = (totalPages / 8) * SLEEP_TIME + (totalPages / 8) * 80; - console.log(`Estimated time: ${estimatedTime / 1000} seconds`); - - async function batcher(page: number) { - const batch = events.slice(page * BATCH_SIZE, (page + 1) * BATCH_SIZE); - - if (batch.length === 0) { - return; - } - - const size = Buffer.byteLength(JSON.stringify(batch)); - console.log(batch.length, size / (1024 * 1024)); - - // await fetch('http://localhost:3333/import/events', { - // method: 'POST', - // headers: { - // 'Content-Type': 'application/json', - // 'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585', - // 'openpanel-client-secret': 'sec_293b903816e327e10c9d', - // }, - // body: JSON.stringify(batch), - // }); - - await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME)); - } - - async function runBatchesInParallel( - totalPages: number, - concurrentBatches: number - ) { - let currentPage = 0; - - while (currentPage < totalPages) { - const promises = []; - for ( - let i = 0; - i < concurrentBatches && currentPage < totalPages; - i++, currentPage++ - ) { - console.log(`Sending batch ${currentPage}... %)`); - promises.push(batcher(currentPage)); - } - await Promise.all(promises); - } - } - console.log(totalPages); - - // Trigger the batches - try { - runBatchesInParallel(totalPages, 8); // Run 8 batches in parallel - } catch (e) { - console.log('ERROR?!', e); - } - - return null; + return importFiles(filePath); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c99e4413..44ebdc65 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -828,6 +828,12 @@ importers: inquirer: specifier: ^9.3.5 version: 9.3.6 + p-limit: + specifier: ^6.1.0 + version: 6.1.0 + progress: + specifier: ^2.0.3 + version: 2.0.3 ramda: specifier: ^0.29.1 version: 0.29.1 @@ -853,6 +859,9 @@ importers: '@types/node': specifier: ^20.14.10 version: 20.14.11 + '@types/progress': + specifier: ^2.0.7 + version: 2.0.7 '@types/ramda': specifier: ^0.30.1 version: 0.30.1 @@ -7632,6 +7641,12 @@ packages: resolution: {integrity: sha512-k7kRA033QNtC+gLc4VPlfnue58CM1iQLgn1IMAU8VPHGOj7oIHPp9UlhedEnD/Gl8evoCjwkZjlBORtZ3JByUA==} dev: false + /@types/progress@2.0.7: + resolution: {integrity: sha512-iadjw02vte8qWx7U0YM++EybBha2CQLPGu9iJ97whVgJUT5Zq9MjAPYUnbfRI2Kpehimf1QjFJYxD0t8nqzu5w==} + dependencies: + '@types/node': 18.19.17 + dev: true + /@types/prop-types@15.7.11: resolution: {integrity: sha512-ga8y9v9uyeiLdpKddhxYQkxNDrfvuPrlFb0N1qnZZByvcElJaXthF1UhvCh9TLWJBEHeNtdnbysW7Y6Uq8CVng==} @@ -15251,6 +15266,13 @@ packages: dependencies: yocto-queue: 0.1.0 + /p-limit@6.1.0: + resolution: {integrity: sha512-H0jc0q1vOzlEk0TqAKXKZxdl7kX3OFUzCnNVUnq5Pc3DGo0kpeaMuPqxQn235HibwBEb0/pm9dgKTjXy66fBkg==} + engines: {node: '>=18'} + dependencies: + yocto-queue: 1.1.1 + dev: false + /p-locate@3.0.0: resolution: {integrity: sha512-x+12w/To+4GFfgJhBEpiDcLozRJGegY+Ei7/z0tSLkMmxGZNybVMSfWj9aJn8Z5Fc7dBUNJOOVgPv2H7IwulSQ==} engines: {node: '>=6'} @@ -18869,6 +18891,11 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + /yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + dev: false + /yoctocolors-cjs@2.1.2: resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==} engines: {node: '>=18'}