diff --git a/packages/cli/src/importer/copy.url.ts b/packages/cli/src/importer/copy.url.ts new file mode 100644 index 00000000..1de5133b --- /dev/null +++ b/packages/cli/src/importer/copy.url.ts @@ -0,0 +1,52 @@ +export function parseSearchParams( + params: URLSearchParams +): Record | undefined { + const result: Record = {}; + for (const [key, value] of params.entries()) { + result[key] = value; + } + return Object.keys(result).length ? result : undefined; +} + +export function parsePath(path?: string): { + query?: Record; + path: string; + origin: string; + hash?: string; +} { + if (!path) { + return { + path: '', + origin: '', + }; + } + + try { + const url = new URL(path); + return { + query: parseSearchParams(url.searchParams), + path: url.pathname, + hash: url.hash || undefined, + origin: url.origin, + }; + } catch (error) { + return { + path, + origin: '', + }; + } +} + +export function isSameDomain( + url1: string | undefined, + url2: string | undefined +) { + if (!url1 || !url2) { + return false; + } + try { + return new URL(url1).hostname === new URL(url2).hostname; + } catch (e) { + return false; + } +} diff --git a/packages/cli/src/importer/load.ts b/packages/cli/src/importer/load.ts index 0437b968..064c71a6 100644 --- a/packages/cli/src/importer/load.ts +++ b/packages/cli/src/importer/load.ts @@ -1,12 +1,40 @@ import { randomUUID } from 'crypto'; import fs from 'fs'; +import os from 'os'; +import v8 from 'v8'; import { glob } from 'glob'; +import { assocPath, clone, last, prop, uniqBy } from 'ramda'; import type { IClickhouseEvent } from '@openpanel/db'; +import { parsePath } from './copy.url'; + const BATCH_SIZE = 8000; // Define your batch size const SLEEP_TIME = 100; // Define your sleep time between batches +function checkNodeMemoryLimit() { + const totalHeapSize = v8.getHeapStatistics().total_available_size; + return totalHeapSize; +} + +function checkMemoryUsage() { + const _totalMemory = os.totalmem(); + const totalMemory = checkNodeMemoryLimit(); + console.log({ + totalMemory, + _totalMemory, + }); + + const usedMemory = process.memoryUsage().heapUsed; + const percentUsed = (usedMemory / totalMemory) * 100; + return percentUsed; +} + +function shouldLoadMoreFiles() { + const percentUsed = checkMemoryUsage(); + return percentUsed < 80; // or any other threshold you deem appropriate +} + function progress(value: string) { process.stdout.clearLine(0); process.stdout.cursorTo(0); @@ -59,6 +87,123 @@ function parseFileContent(fileContent: string): { } } +interface Session { + start: number; // Timestamp of the session start + end: number; // Timestamp of the session end + profileId?: string; + deviceId?: string; + sessionId: string; + firstEvent?: IClickhouseEvent; + lastEvent?: IClickhouseEvent; + events: IClickhouseEvent[]; +} + +function generateSessionEvents(events: IClickhouseEvent[]): Session[] { + let sessionList: Session[] = []; + const lastSessionByDevice: Record = {}; + const lastSessionByProfile: Record = {}; + const thirtyMinutes = 30 * 60 * 1000; // 30 minutes in milliseconds + + events.sort( + (a, b) => + new Date(a.created_at).getTime() - new Date(b.created_at).getTime() + ); + + for (const event of events) { + const eventTime = new Date(event.created_at).getTime(); + let deviceSession = event.device_id + ? lastSessionByDevice[event.device_id] + : undefined; + let profileSession = event.profile_id + ? lastSessionByProfile[event.profile_id] + : undefined; + + // Check if new session is needed for deviceId + if ( + event.device_id && + event.device_id !== event.profile_id && + (!deviceSession || eventTime > deviceSession.end + thirtyMinutes) + ) { + deviceSession = { + start: eventTime, + end: eventTime, + deviceId: event.device_id, + sessionId: generateSessionId(), + firstEvent: event, + events: [event], + }; + lastSessionByDevice[event.device_id] = deviceSession; + sessionList.push(deviceSession); + } else if (deviceSession) { + deviceSession.end = eventTime; + deviceSession.lastEvent = event; + deviceSession.events.push(event); + } + + // Check if new session is needed for profileId + if ( + event.profile_id && + event.device_id !== event.profile_id && + (!profileSession || eventTime > profileSession.end + thirtyMinutes) + ) { + profileSession = { + start: eventTime, + end: eventTime, + profileId: event.profile_id, + sessionId: generateSessionId(), + firstEvent: event, + events: [event], + }; + lastSessionByProfile[event.profile_id] = profileSession; + sessionList.push(profileSession); + } else if (profileSession) { + profileSession.end = eventTime; + profileSession.lastEvent = event; + profileSession.events.push(event); + } + + // Sync device and profile sessions if both exist + // if ( + // deviceSession && + // profileSession && + // deviceSession.sessionId !== profileSession.sessionId + // ) { + // profileSession.sessionId = deviceSession.sessionId; + // } + + if ( + deviceSession && + profileSession && + deviceSession.sessionId !== profileSession.sessionId + ) { + // Merge sessions by ensuring they reference the same object + const unifiedSession = { + ...deviceSession, + ...profileSession, + events: [...deviceSession.events, ...profileSession.events], + start: Math.min(deviceSession.start, profileSession.start), + end: Math.max(deviceSession.end, profileSession.end), + sessionId: deviceSession.sessionId, // Prefer the deviceSession ID for no particular reason + }; + lastSessionByDevice[event.device_id] = unifiedSession; + lastSessionByProfile[event.profile_id] = unifiedSession; + // filter previous before appending new unified fileter + sessionList = sessionList.filter( + (session) => + session.sessionId !== deviceSession?.sessionId && + session.sessionId !== profileSession?.sessionId + ); + sessionList.push(unifiedSession); + } + } + + return sessionList; +} + +function generateSessionId(): string { + return randomUUID(); +} + export async function loadFilesBatcher() { const files = await glob(['../../../../Downloads/mp-data/*.txt'], { root: '/', @@ -75,7 +220,7 @@ export async function loadFilesBatcher() { const times = []; const chunksArray = chunks(files, 5); let chunkIndex = 0; - for (const chunk of chunksArray) { + for (const chunk of chunksArray.slice(0, 1)) { if (times.length > 0) { // Print out how much time is approximately left const average = times.reduce((a, b) => a + b) / times.length; @@ -118,54 +263,36 @@ async function loadFiles(files: string[] = []) { }); }); - const events: IClickhouseEvent[] = data.flat().map((event) => { - if (event.properties.mp_lib === 'web') { - return { + // sorted oldest to latest + const a = data + .flat() + .sort((a, b) => a.properties.time - b.properties.time) + .map((event) => { + const currentUrl = event.properties.$current_url; + if (currentUrl) { + // console.log(''); + // console.log(event.properties); + // console.log(''); + } + const url = parsePath(currentUrl); + const eventToSave = { profile_id: event.properties.distinct_id - ? String(event.properties.distinct_id) - : '', + ? String(event.properties.distinct_id).replace(/^\$device:/, '') + : event.properties.$device_id ?? '', name: event.event, created_at: new Date(event.properties.time * 1000).toISOString(), - properties: stripMixpanelProperties(event.properties) as Record< - string, - string - >, - country: event.properties.country_code, - region: event.properties.$region, - city: event.properties.$city, - os: event.properties.$os, - browser: event.properties.$browser, - browser_version: event.properties.$browser_version - ? String(event.properties.$browser_version) - : '', - referrer: event.properties.$initial_referrer, - referrer_type: event.properties.$search_engine ? 'search' : '', // FIX (IN API) - referrer_name: event.properties.$search_engine ?? '', // FIX (IN API) - device_id: event.properties.$device_id, - session_id: '', - project_id: '', // FIX (IN API) - path: event.properties.$current_url, // FIX - origin: '', // FIX (IN API) - os_version: '', // FIX - model: '', - longitude: null, - latitude: null, - id: randomUUID(), - duration: 0, - device: '', // FIX - brand: '', - }; - } else { - return { - profile_id: event.properties.distinct_id - ? String(event.properties.distinct_id) - : '', - name: event.event, - created_at: new Date(event.properties.time * 1000).toISOString(), - properties: stripMixpanelProperties(event.properties) as Record< - string, - string - >, + properties: { + ...(stripMixpanelProperties(event.properties) as Record< + string, + string + >), + ...(currentUrl + ? { + __query: url.query, + __hash: url.hash, + } + : {}), + }, country: event.properties.country_code ?? '', region: event.properties.$region ?? '', city: event.properties.$city ?? '', @@ -180,18 +307,56 @@ async function loadFiles(files: string[] = []) { device_id: event.properties.$device_id ?? '', session_id: '', project_id: '', // FIX (IN API) - path: event.properties.$current_url ?? '', // FIX - origin: '', // FIX (IN API) + path: url.path, + origin: url.origin, os_version: '', // FIX model: '', longitude: null, latitude: null, id: randomUUID(), duration: 0, - device: '', // FIX + device: currentUrl ? '' : 'server', brand: '', }; + return eventToSave; + }); + + const sessions = generateSessionEvents(a); + + const events = sessions.flatMap((session) => { + if ( + session.profileId === '594447' || + session.deviceId === + '19081f09f2d666-082ba152fdf7548-7f7a3660-5a900-19081f09f2d666' + ) { + console.log(session); } + return [ + session.firstEvent && { + ...session.firstEvent, + id: randomUUID(), + created_at: new Date( + new Date(session.firstEvent.created_at).getTime() - 1000 + ).toISOString(), + session_id: session.sessionId, + name: 'session_start', + }, + ...uniqBy( + prop('id'), + session.events.map((event) => + assocPath(['session_id'], session.sessionId, clone(event)) + ) + ), + session.lastEvent && { + ...session.lastEvent, + id: randomUUID(), + created_at: new Date( + new Date(session.lastEvent.created_at).getTime() + 1000 + ).toISOString(), + session_id: session.sessionId, + name: 'session_end', + }, + ].filter(Boolean); }); const totalPages = Math.ceil(events.length / BATCH_SIZE);