This commit is contained in:
Carl-Gerhard Lindesvärd
2024-07-11 22:48:59 +02:00
committed by Carl-Gerhard Lindesvärd
parent da856152c7
commit 3129f62c14
2 changed files with 266 additions and 49 deletions

View File

@@ -0,0 +1,52 @@
export function parseSearchParams(
params: URLSearchParams
): Record<string, string> | undefined {
const result: Record<string, string> = {};
for (const [key, value] of params.entries()) {
result[key] = value;
}
return Object.keys(result).length ? result : undefined;
}
export function parsePath(path?: string): {
query?: Record<string, string>;
path: string;
origin: string;
hash?: string;
} {
if (!path) {
return {
path: '',
origin: '',
};
}
try {
const url = new URL(path);
return {
query: parseSearchParams(url.searchParams),
path: url.pathname,
hash: url.hash || undefined,
origin: url.origin,
};
} catch (error) {
return {
path,
origin: '',
};
}
}
export function isSameDomain(
url1: string | undefined,
url2: string | undefined
) {
if (!url1 || !url2) {
return false;
}
try {
return new URL(url1).hostname === new URL(url2).hostname;
} catch (e) {
return false;
}
}

View File

@@ -1,12 +1,40 @@
import { randomUUID } from 'crypto';
import fs from 'fs';
import os from 'os';
import v8 from 'v8';
import { glob } from 'glob';
import { assocPath, clone, last, prop, uniqBy } from 'ramda';
import type { IClickhouseEvent } from '@openpanel/db';
import { parsePath } from './copy.url';
const BATCH_SIZE = 8000; // Define your batch size
const SLEEP_TIME = 100; // Define your sleep time between batches
function checkNodeMemoryLimit() {
const totalHeapSize = v8.getHeapStatistics().total_available_size;
return totalHeapSize;
}
function checkMemoryUsage() {
const _totalMemory = os.totalmem();
const totalMemory = checkNodeMemoryLimit();
console.log({
totalMemory,
_totalMemory,
});
const usedMemory = process.memoryUsage().heapUsed;
const percentUsed = (usedMemory / totalMemory) * 100;
return percentUsed;
}
function shouldLoadMoreFiles() {
const percentUsed = checkMemoryUsage();
return percentUsed < 80; // or any other threshold you deem appropriate
}
function progress(value: string) {
process.stdout.clearLine(0);
process.stdout.cursorTo(0);
@@ -59,6 +87,123 @@ function parseFileContent(fileContent: string): {
}
}
interface Session {
start: number; // Timestamp of the session start
end: number; // Timestamp of the session end
profileId?: string;
deviceId?: string;
sessionId: string;
firstEvent?: IClickhouseEvent;
lastEvent?: IClickhouseEvent;
events: IClickhouseEvent[];
}
function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
let sessionList: Session[] = [];
const lastSessionByDevice: Record<string, Session> = {};
const lastSessionByProfile: Record<string, Session> = {};
const thirtyMinutes = 30 * 60 * 1000; // 30 minutes in milliseconds
events.sort(
(a, b) =>
new Date(a.created_at).getTime() - new Date(b.created_at).getTime()
);
for (const event of events) {
const eventTime = new Date(event.created_at).getTime();
let deviceSession = event.device_id
? lastSessionByDevice[event.device_id]
: undefined;
let profileSession = event.profile_id
? lastSessionByProfile[event.profile_id]
: undefined;
// Check if new session is needed for deviceId
if (
event.device_id &&
event.device_id !== event.profile_id &&
(!deviceSession || eventTime > deviceSession.end + thirtyMinutes)
) {
deviceSession = {
start: eventTime,
end: eventTime,
deviceId: event.device_id,
sessionId: generateSessionId(),
firstEvent: event,
events: [event],
};
lastSessionByDevice[event.device_id] = deviceSession;
sessionList.push(deviceSession);
} else if (deviceSession) {
deviceSession.end = eventTime;
deviceSession.lastEvent = event;
deviceSession.events.push(event);
}
// Check if new session is needed for profileId
if (
event.profile_id &&
event.device_id !== event.profile_id &&
(!profileSession || eventTime > profileSession.end + thirtyMinutes)
) {
profileSession = {
start: eventTime,
end: eventTime,
profileId: event.profile_id,
sessionId: generateSessionId(),
firstEvent: event,
events: [event],
};
lastSessionByProfile[event.profile_id] = profileSession;
sessionList.push(profileSession);
} else if (profileSession) {
profileSession.end = eventTime;
profileSession.lastEvent = event;
profileSession.events.push(event);
}
// Sync device and profile sessions if both exist
// if (
// deviceSession &&
// profileSession &&
// deviceSession.sessionId !== profileSession.sessionId
// ) {
// profileSession.sessionId = deviceSession.sessionId;
// }
if (
deviceSession &&
profileSession &&
deviceSession.sessionId !== profileSession.sessionId
) {
// Merge sessions by ensuring they reference the same object
const unifiedSession = {
...deviceSession,
...profileSession,
events: [...deviceSession.events, ...profileSession.events],
start: Math.min(deviceSession.start, profileSession.start),
end: Math.max(deviceSession.end, profileSession.end),
sessionId: deviceSession.sessionId, // Prefer the deviceSession ID for no particular reason
};
lastSessionByDevice[event.device_id] = unifiedSession;
lastSessionByProfile[event.profile_id] = unifiedSession;
// filter previous before appending new unified fileter
sessionList = sessionList.filter(
(session) =>
session.sessionId !== deviceSession?.sessionId &&
session.sessionId !== profileSession?.sessionId
);
sessionList.push(unifiedSession);
}
}
return sessionList;
}
function generateSessionId(): string {
return randomUUID();
}
export async function loadFilesBatcher() {
const files = await glob(['../../../../Downloads/mp-data/*.txt'], {
root: '/',
@@ -75,7 +220,7 @@ export async function loadFilesBatcher() {
const times = [];
const chunksArray = chunks(files, 5);
let chunkIndex = 0;
for (const chunk of chunksArray) {
for (const chunk of chunksArray.slice(0, 1)) {
if (times.length > 0) {
// Print out how much time is approximately left
const average = times.reduce((a, b) => a + b) / times.length;
@@ -118,54 +263,36 @@ async function loadFiles(files: string[] = []) {
});
});
const events: IClickhouseEvent[] = data.flat().map((event) => {
if (event.properties.mp_lib === 'web') {
return {
// sorted oldest to latest
const a = data
.flat()
.sort((a, b) => a.properties.time - b.properties.time)
.map((event) => {
const currentUrl = event.properties.$current_url;
if (currentUrl) {
// console.log('');
// console.log(event.properties);
// console.log('');
}
const url = parsePath(currentUrl);
const eventToSave = {
profile_id: event.properties.distinct_id
? String(event.properties.distinct_id)
: '',
? String(event.properties.distinct_id).replace(/^\$device:/, '')
: event.properties.$device_id ?? '',
name: event.event,
created_at: new Date(event.properties.time * 1000).toISOString(),
properties: stripMixpanelProperties(event.properties) as Record<
string,
string
>,
country: event.properties.country_code,
region: event.properties.$region,
city: event.properties.$city,
os: event.properties.$os,
browser: event.properties.$browser,
browser_version: event.properties.$browser_version
? String(event.properties.$browser_version)
: '',
referrer: event.properties.$initial_referrer,
referrer_type: event.properties.$search_engine ? 'search' : '', // FIX (IN API)
referrer_name: event.properties.$search_engine ?? '', // FIX (IN API)
device_id: event.properties.$device_id,
session_id: '',
project_id: '', // FIX (IN API)
path: event.properties.$current_url, // FIX
origin: '', // FIX (IN API)
os_version: '', // FIX
model: '',
longitude: null,
latitude: null,
id: randomUUID(),
duration: 0,
device: '', // FIX
brand: '',
};
} else {
return {
profile_id: event.properties.distinct_id
? String(event.properties.distinct_id)
: '',
name: event.event,
created_at: new Date(event.properties.time * 1000).toISOString(),
properties: stripMixpanelProperties(event.properties) as Record<
string,
string
>,
properties: {
...(stripMixpanelProperties(event.properties) as Record<
string,
string
>),
...(currentUrl
? {
__query: url.query,
__hash: url.hash,
}
: {}),
},
country: event.properties.country_code ?? '',
region: event.properties.$region ?? '',
city: event.properties.$city ?? '',
@@ -180,18 +307,56 @@ async function loadFiles(files: string[] = []) {
device_id: event.properties.$device_id ?? '',
session_id: '',
project_id: '', // FIX (IN API)
path: event.properties.$current_url ?? '', // FIX
origin: '', // FIX (IN API)
path: url.path,
origin: url.origin,
os_version: '', // FIX
model: '',
longitude: null,
latitude: null,
id: randomUUID(),
duration: 0,
device: '', // FIX
device: currentUrl ? '' : 'server',
brand: '',
};
return eventToSave;
});
const sessions = generateSessionEvents(a);
const events = sessions.flatMap((session) => {
if (
session.profileId === '594447' ||
session.deviceId ===
'19081f09f2d666-082ba152fdf7548-7f7a3660-5a900-19081f09f2d666'
) {
console.log(session);
}
return [
session.firstEvent && {
...session.firstEvent,
id: randomUUID(),
created_at: new Date(
new Date(session.firstEvent.created_at).getTime() - 1000
).toISOString(),
session_id: session.sessionId,
name: 'session_start',
},
...uniqBy(
prop('id'),
session.events.map((event) =>
assocPath(['session_id'], session.sessionId, clone(event))
)
),
session.lastEvent && {
...session.lastEvent,
id: randomUUID(),
created_at: new Date(
new Date(session.lastEvent.created_at).getTime() + 1000
).toISOString(),
session_id: session.sessionId,
name: 'session_end',
},
].filter(Boolean);
});
const totalPages = Math.ceil(events.length / BATCH_SIZE);