first working cli importer
This commit is contained in:
committed by
Carl-Gerhard Lindesvärd
parent
bf0c14cc88
commit
1b613538cc
@@ -1,52 +0,0 @@
|
||||
export function parseSearchParams(
|
||||
params: URLSearchParams
|
||||
): Record<string, string> | undefined {
|
||||
const result: Record<string, string> = {};
|
||||
for (const [key, value] of params.entries()) {
|
||||
result[key] = value;
|
||||
}
|
||||
return Object.keys(result).length ? result : undefined;
|
||||
}
|
||||
|
||||
export function parsePath(path?: string): {
|
||||
query?: Record<string, string>;
|
||||
path: string;
|
||||
origin: string;
|
||||
hash?: string;
|
||||
} {
|
||||
if (!path) {
|
||||
return {
|
||||
path: '',
|
||||
origin: '',
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const url = new URL(path);
|
||||
return {
|
||||
query: parseSearchParams(url.searchParams),
|
||||
path: url.pathname,
|
||||
hash: url.hash || undefined,
|
||||
origin: url.origin,
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
path,
|
||||
origin: '',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function isSameDomain(
|
||||
url1: string | undefined,
|
||||
url2: string | undefined
|
||||
) {
|
||||
if (!url1 || !url2) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return new URL(url1).hostname === new URL(url2).hostname;
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,83 +1,99 @@
|
||||
import { randomUUID } from 'crypto';
|
||||
import fs from 'fs';
|
||||
import readline from 'readline';
|
||||
import { glob } from 'glob';
|
||||
import { assocPath, clone, prop, uniqBy } from 'ramda';
|
||||
import Progress from 'progress';
|
||||
import { assocPath, prop, uniqBy } from 'ramda';
|
||||
|
||||
import type { IClickhouseEvent } from '@openpanel/db';
|
||||
import { parsePath } from '@openpanel/common';
|
||||
import type { IImportedEvent } from '@openpanel/db';
|
||||
|
||||
import { parsePath } from './copy.url';
|
||||
const BATCH_SIZE = 1000;
|
||||
const SLEEP_TIME = 20;
|
||||
|
||||
const BATCH_SIZE = 8000; // Define your batch size
|
||||
const SLEEP_TIME = 100; // Define your sleep time between batches
|
||||
|
||||
function progress(value: string) {
|
||||
process.stdout.clearLine(0);
|
||||
process.stdout.cursorTo(0);
|
||||
process.stdout.write(value);
|
||||
}
|
||||
|
||||
function stripMixpanelProperties(obj: Record<string, unknown>) {
|
||||
const properties = ['time', 'distinct_id'];
|
||||
const result: Record<string, unknown> = {};
|
||||
for (const key in obj) {
|
||||
if (key.match(/^(\$|mp_)/) || properties.includes(key)) {
|
||||
continue;
|
||||
}
|
||||
result[key] = obj[key];
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function safeParse(json: string) {
|
||||
try {
|
||||
return JSON.parse(json);
|
||||
} catch (error) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function parseFileContent(fileContent: string): {
|
||||
type IMixpanelEvent = {
|
||||
event: string;
|
||||
properties: {
|
||||
time: number;
|
||||
distinct_id?: string | number;
|
||||
[key: string]: unknown;
|
||||
time: number;
|
||||
$current_url?: string;
|
||||
distinct_id?: string;
|
||||
$device_id?: string;
|
||||
country_code?: string;
|
||||
$region?: string;
|
||||
$city?: string;
|
||||
$os?: string;
|
||||
$browser?: string;
|
||||
$browser_version?: string;
|
||||
$initial_referrer?: string;
|
||||
$search_engine?: string;
|
||||
};
|
||||
}[] {
|
||||
try {
|
||||
return JSON.parse(fileContent);
|
||||
} catch (error) {
|
||||
const lines = fileContent.trim().split('\n');
|
||||
return lines
|
||||
.map((line, index) => {
|
||||
const json = safeParse(line);
|
||||
if (!json) {
|
||||
console.log('Warning: Failed to parse JSON');
|
||||
console.log('Index:', index);
|
||||
console.log('Line:', line);
|
||||
}
|
||||
return json;
|
||||
})
|
||||
.filter(Boolean);
|
||||
};
|
||||
|
||||
function stripMixpanelProperties(obj: Record<string, unknown>) {
|
||||
return Object.fromEntries(
|
||||
Object.entries(obj).filter(
|
||||
([key]) =>
|
||||
!key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async function* parseJsonStream(
|
||||
fileStream: fs.ReadStream
|
||||
): AsyncGenerator<any, void, unknown> {
|
||||
const rl = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
let buffer = '';
|
||||
let bracketCount = 0;
|
||||
|
||||
for await (const line of rl) {
|
||||
buffer += line;
|
||||
bracketCount +=
|
||||
(line.match(/{/g) || []).length - (line.match(/}/g) || []).length;
|
||||
|
||||
if (bracketCount === 0 && buffer.trim()) {
|
||||
try {
|
||||
const json = JSON.parse(buffer);
|
||||
yield json;
|
||||
} catch (error) {
|
||||
console.log('Warning: Failed to parse JSON');
|
||||
console.log('Buffer:', buffer);
|
||||
}
|
||||
buffer = '';
|
||||
}
|
||||
}
|
||||
|
||||
if (buffer.trim()) {
|
||||
try {
|
||||
const json = JSON.parse(buffer);
|
||||
yield json;
|
||||
} catch (error) {
|
||||
console.log('Warning: Failed to parse remaining JSON');
|
||||
console.log('Buffer:', buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface Session {
|
||||
start: number; // Timestamp of the session start
|
||||
end: number; // Timestamp of the session end
|
||||
start: number;
|
||||
end: number;
|
||||
profileId?: string;
|
||||
deviceId?: string;
|
||||
sessionId: string;
|
||||
firstEvent?: IClickhouseEvent;
|
||||
lastEvent?: IClickhouseEvent;
|
||||
events: IClickhouseEvent[];
|
||||
firstEvent?: IImportedEvent;
|
||||
lastEvent?: IImportedEvent;
|
||||
events: IImportedEvent[];
|
||||
}
|
||||
|
||||
function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
function generateSessionEvents(events: IImportedEvent[]): Session[] {
|
||||
let sessionList: Session[] = [];
|
||||
const lastSessionByDevice: Record<string, Session> = {};
|
||||
const lastSessionByProfile: Record<string, Session> = {};
|
||||
const thirtyMinutes = 30 * 60 * 1000; // 30 minutes in milliseconds
|
||||
const thirtyMinutes = 30 * 60 * 1000;
|
||||
|
||||
events.sort(
|
||||
(a, b) =>
|
||||
@@ -93,7 +109,6 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
? lastSessionByProfile[event.profile_id]
|
||||
: undefined;
|
||||
|
||||
// Check if new session is needed for deviceId
|
||||
if (
|
||||
event.device_id &&
|
||||
event.device_id !== event.profile_id &&
|
||||
@@ -103,7 +118,7 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
start: eventTime,
|
||||
end: eventTime,
|
||||
deviceId: event.device_id,
|
||||
sessionId: generateSessionId(),
|
||||
sessionId: randomUUID(),
|
||||
firstEvent: event,
|
||||
events: [event],
|
||||
};
|
||||
@@ -115,7 +130,6 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
deviceSession.events.push(event);
|
||||
}
|
||||
|
||||
// Check if new session is needed for profileId
|
||||
if (
|
||||
event.profile_id &&
|
||||
event.device_id !== event.profile_id &&
|
||||
@@ -125,7 +139,7 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
start: eventTime,
|
||||
end: eventTime,
|
||||
profileId: event.profile_id,
|
||||
sessionId: generateSessionId(),
|
||||
sessionId: randomUUID(),
|
||||
firstEvent: event,
|
||||
events: [event],
|
||||
};
|
||||
@@ -137,32 +151,21 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
profileSession.events.push(event);
|
||||
}
|
||||
|
||||
// Sync device and profile sessions if both exist
|
||||
// if (
|
||||
// deviceSession &&
|
||||
// profileSession &&
|
||||
// deviceSession.sessionId !== profileSession.sessionId
|
||||
// ) {
|
||||
// profileSession.sessionId = deviceSession.sessionId;
|
||||
// }
|
||||
|
||||
if (
|
||||
deviceSession &&
|
||||
profileSession &&
|
||||
deviceSession.sessionId !== profileSession.sessionId
|
||||
) {
|
||||
// Merge sessions by ensuring they reference the same object
|
||||
const unifiedSession = {
|
||||
...deviceSession,
|
||||
...profileSession,
|
||||
events: [...deviceSession.events, ...profileSession.events],
|
||||
start: Math.min(deviceSession.start, profileSession.start),
|
||||
end: Math.max(deviceSession.end, profileSession.end),
|
||||
sessionId: deviceSession.sessionId, // Prefer the deviceSession ID for no particular reason
|
||||
sessionId: deviceSession.sessionId,
|
||||
};
|
||||
lastSessionByDevice[event.device_id] = unifiedSession;
|
||||
lastSessionByProfile[event.profile_id] = unifiedSession;
|
||||
// filter previous before appending new unified fileter
|
||||
sessionList = sessionList.filter(
|
||||
(session) =>
|
||||
session.sessionId !== deviceSession?.sessionId &&
|
||||
@@ -175,99 +178,88 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
return sessionList;
|
||||
}
|
||||
|
||||
function generateSessionId(): string {
|
||||
return randomUUID();
|
||||
function createEventObject(event: IMixpanelEvent): IImportedEvent {
|
||||
const url = parsePath(event.properties.$current_url);
|
||||
return {
|
||||
profile_id: event.properties.distinct_id
|
||||
? String(event.properties.distinct_id).replace(/^\$device:/, '')
|
||||
: event.properties.$device_id ?? '',
|
||||
name: event.event,
|
||||
created_at: new Date(event.properties.time * 1000).toISOString(),
|
||||
properties: {
|
||||
...stripMixpanelProperties(event.properties),
|
||||
...(event.properties.$current_url
|
||||
? {
|
||||
__query: url.query,
|
||||
__hash: url.hash,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
country: event.properties.country_code ?? '',
|
||||
region: event.properties.$region ?? '',
|
||||
city: event.properties.$city ?? '',
|
||||
os: event.properties.$os ?? '',
|
||||
browser: event.properties.$browser ?? '',
|
||||
browser_version: event.properties.$browser_version
|
||||
? String(event.properties.$browser_version)
|
||||
: '',
|
||||
referrer: event.properties.$initial_referrer ?? '',
|
||||
referrer_type: event.properties.$search_engine ? 'search' : '',
|
||||
referrer_name: event.properties.$search_engine ?? '',
|
||||
device_id: event.properties.$device_id ?? '',
|
||||
session_id: '',
|
||||
project_id: '',
|
||||
path: url.path,
|
||||
origin: url.origin,
|
||||
os_version: '',
|
||||
model: '',
|
||||
longitude: null,
|
||||
latitude: null,
|
||||
id: randomUUID(),
|
||||
duration: 0,
|
||||
device: event.properties.$current_url ? '' : 'server',
|
||||
brand: '',
|
||||
};
|
||||
}
|
||||
|
||||
async function loadFiles(files: string[] = []) {
|
||||
const data: any[] = [];
|
||||
const filesToParse = files.slice(0, 10);
|
||||
function isMixpanelEvent(event: any): event is IMixpanelEvent {
|
||||
return (
|
||||
typeof event === 'object' &&
|
||||
event !== null &&
|
||||
typeof event?.event === 'string' &&
|
||||
typeof event?.properties === 'object' &&
|
||||
event?.properties !== null &&
|
||||
typeof event?.properties.time === 'number'
|
||||
);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => {
|
||||
filesToParse.forEach((file) => {
|
||||
const readStream = fs.createReadStream(file);
|
||||
const content: any[] = [];
|
||||
|
||||
readStream.on('data', (chunk) => {
|
||||
content.push(chunk.toString('utf-8'));
|
||||
});
|
||||
|
||||
readStream.on('end', () => {
|
||||
console.log('Finished reading file:', file);
|
||||
data.push(parseFileContent(content.join('')));
|
||||
if (data.length === filesToParse.length) {
|
||||
resolve(1);
|
||||
async function processFile(file: string): Promise<IImportedEvent[]> {
|
||||
const fileStream = fs.createReadStream(file);
|
||||
const events: IImportedEvent[] = [];
|
||||
for await (const event of parseJsonStream(fileStream)) {
|
||||
if (Array.isArray(event)) {
|
||||
for (const item of event) {
|
||||
if (isMixpanelEvent(item)) {
|
||||
events.push(createEventObject(item));
|
||||
} else {
|
||||
console.log('Not a Mixpanel event', item);
|
||||
}
|
||||
});
|
||||
|
||||
readStream.on('error', (error) => {
|
||||
console.error('Error reading file:', error);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// sorted oldest to latest
|
||||
const a = data
|
||||
.flat()
|
||||
.sort((a, b) => a.properties.time - b.properties.time)
|
||||
.map((event) => {
|
||||
const currentUrl = event.properties.$current_url;
|
||||
if (currentUrl) {
|
||||
// console.log('');
|
||||
// console.log(event.properties);
|
||||
// console.log('');
|
||||
}
|
||||
const url = parsePath(currentUrl);
|
||||
const eventToSave = {
|
||||
profile_id: event.properties.distinct_id
|
||||
? String(event.properties.distinct_id).replace(/^\$device:/, '')
|
||||
: event.properties.$device_id ?? '',
|
||||
name: event.event,
|
||||
created_at: new Date(event.properties.time * 1000).toISOString(),
|
||||
properties: {
|
||||
...(stripMixpanelProperties(event.properties) as Record<
|
||||
string,
|
||||
string
|
||||
>),
|
||||
...(currentUrl
|
||||
? {
|
||||
__query: url.query,
|
||||
__hash: url.hash,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
country: event.properties.country_code ?? '',
|
||||
region: event.properties.$region ?? '',
|
||||
city: event.properties.$city ?? '',
|
||||
os: event.properties.$os ?? '',
|
||||
browser: event.properties.$browser ?? '',
|
||||
browser_version: event.properties.$browser_version
|
||||
? String(event.properties.$browser_version)
|
||||
: '',
|
||||
referrer: event.properties.$initial_referrer ?? '',
|
||||
referrer_type: event.properties.$search_engine ? 'search' : '', // FIX (IN API)
|
||||
referrer_name: event.properties.$search_engine ?? '', // FIX (IN API)
|
||||
device_id: event.properties.$device_id ?? '',
|
||||
session_id: '',
|
||||
project_id: '', // FIX (IN API)
|
||||
path: url.path,
|
||||
origin: url.origin,
|
||||
os_version: '', // FIX
|
||||
model: '',
|
||||
longitude: null,
|
||||
latitude: null,
|
||||
id: randomUUID(),
|
||||
duration: 0,
|
||||
device: currentUrl ? '' : 'server',
|
||||
brand: '',
|
||||
};
|
||||
return eventToSave;
|
||||
});
|
||||
} else {
|
||||
if (isMixpanelEvent(event)) {
|
||||
events.push(createEventObject(event));
|
||||
} else {
|
||||
console.log('Not a Mixpanel event', event);
|
||||
}
|
||||
}
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
const sessions = generateSessionEvents(a);
|
||||
|
||||
const events = sessions.flatMap((session) => {
|
||||
return [
|
||||
function processEvents(events: IImportedEvent[]): IImportedEvent[] {
|
||||
const sessions = generateSessionEvents(events);
|
||||
const processedEvents = sessions.flatMap((session) =>
|
||||
[
|
||||
session.firstEvent && {
|
||||
...session.firstEvent,
|
||||
id: randomUUID(),
|
||||
@@ -280,7 +272,7 @@ async function loadFiles(files: string[] = []) {
|
||||
...uniqBy(
|
||||
prop('id'),
|
||||
session.events.map((event) =>
|
||||
assocPath(['session_id'], session.sessionId, clone(event))
|
||||
assocPath(['session_id'], session.sessionId, event)
|
||||
)
|
||||
),
|
||||
session.lastEvent && {
|
||||
@@ -292,24 +284,20 @@ async function loadFiles(files: string[] = []) {
|
||||
session_id: session.sessionId,
|
||||
name: 'session_end',
|
||||
},
|
||||
].filter(Boolean);
|
||||
});
|
||||
].filter((item): item is IImportedEvent => !!item)
|
||||
);
|
||||
|
||||
const totalPages = Math.ceil(events.length / BATCH_SIZE);
|
||||
const estimatedTime = (totalPages / 8) * SLEEP_TIME + (totalPages / 8) * 80;
|
||||
console.log(`Estimated time: ${estimatedTime / 1000} seconds`);
|
||||
return [
|
||||
...processedEvents,
|
||||
...events.filter((event) => {
|
||||
return !event.profile_id && !event.device_id;
|
||||
}),
|
||||
];
|
||||
}
|
||||
|
||||
async function batcher(page: number) {
|
||||
const batch = events.slice(page * BATCH_SIZE, (page + 1) * BATCH_SIZE);
|
||||
|
||||
if (batch.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// const size = Buffer.byteLength(JSON.stringify(batch));
|
||||
// console.log(batch.length, size / (1024 * 1024));
|
||||
|
||||
await fetch('http://localhost:3333/import/events', {
|
||||
async function sendBatchToAPI(batch: IImportedEvent[]) {
|
||||
try {
|
||||
const res = await fetch('http://localhost:3333/import/events', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
@@ -318,72 +306,78 @@ async function loadFiles(files: string[] = []) {
|
||||
},
|
||||
body: JSON.stringify(batch),
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME));
|
||||
} catch (e) {
|
||||
console.log('sendBatchToAPI failed');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
async function runBatchesInParallel(
|
||||
totalPages: number,
|
||||
concurrentBatches: number
|
||||
) {
|
||||
let currentPage = 0;
|
||||
async function processFiles(files: string[]) {
|
||||
const progress = new Progress(
|
||||
'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status',
|
||||
{
|
||||
total: files.length,
|
||||
width: 20,
|
||||
}
|
||||
);
|
||||
let savedEvents = 0;
|
||||
let currentBatch: IImportedEvent[] = [];
|
||||
let apiBatching = [];
|
||||
|
||||
while (currentPage < totalPages) {
|
||||
const promises = [];
|
||||
for (
|
||||
let i = 0;
|
||||
i < concurrentBatches && currentPage < totalPages;
|
||||
i++, currentPage++
|
||||
) {
|
||||
progress(
|
||||
`Sending batch ${currentPage} (${Math.round((currentPage / totalPages) * 100)}... %)`
|
||||
);
|
||||
promises.push(batcher(currentPage));
|
||||
for (const file of files) {
|
||||
progress.tick({
|
||||
file,
|
||||
savedEvents,
|
||||
status: 'reading file',
|
||||
});
|
||||
const events = await processFile(file);
|
||||
progress.render({
|
||||
file,
|
||||
savedEvents,
|
||||
status: 'processing events',
|
||||
});
|
||||
const processedEvents = processEvents(events);
|
||||
for (const event of processedEvents) {
|
||||
currentBatch.push(event);
|
||||
if (currentBatch.length >= BATCH_SIZE) {
|
||||
apiBatching.push(currentBatch);
|
||||
savedEvents += currentBatch.length;
|
||||
progress.render({ file, savedEvents, status: 'saving events' });
|
||||
currentBatch = [];
|
||||
}
|
||||
|
||||
if (apiBatching.length >= 10) {
|
||||
await Promise.all(apiBatching.map(sendBatchToAPI));
|
||||
apiBatching = [];
|
||||
}
|
||||
await Promise.all(promises);
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger the batches
|
||||
try {
|
||||
await runBatchesInParallel(totalPages, 8); // Run 8 batches in parallel
|
||||
} catch (e) {
|
||||
console.log('ERROR?!', e);
|
||||
if (currentBatch.length > 0) {
|
||||
await sendBatchToAPI(currentBatch);
|
||||
savedEvents += currentBatch.length;
|
||||
progress.render({ file: 'Complete', savedEvents, status: 'Complete' });
|
||||
}
|
||||
}
|
||||
|
||||
export async function importFiles(matcher: string) {
|
||||
const files = await glob([matcher], {
|
||||
root: '/',
|
||||
});
|
||||
const files = await glob([matcher], { root: '/' });
|
||||
|
||||
if (files.length === 0) {
|
||||
console.log('No files found');
|
||||
return;
|
||||
}
|
||||
|
||||
function chunks(array: string[], size: number) {
|
||||
const results = [];
|
||||
while (array.length) {
|
||||
results.push(array.splice(0, size));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
files.sort((a, b) => a.localeCompare(b));
|
||||
|
||||
const times = [];
|
||||
const chunksArray = chunks(files, 3);
|
||||
let chunkIndex = 0;
|
||||
for (const chunk of chunksArray) {
|
||||
if (times.length > 0) {
|
||||
// Print out how much time is approximately left
|
||||
const average = times.reduce((a, b) => a + b) / times.length;
|
||||
const remaining = (chunksArray.length - chunkIndex) * average;
|
||||
console.log(`\n\nEstimated time left: ${remaining / 1000 / 60} minutes`);
|
||||
}
|
||||
console.log('Processing chunk:', chunkIndex);
|
||||
chunkIndex++;
|
||||
const d = Date.now();
|
||||
await loadFiles(chunk);
|
||||
times.push(Date.now() - d);
|
||||
}
|
||||
console.log(`Found ${files.length} files to process`);
|
||||
|
||||
const startTime = Date.now();
|
||||
await processFiles(files);
|
||||
const endTime = Date.now();
|
||||
|
||||
console.log(
|
||||
`\nProcessing completed in ${(endTime - startTime) / 1000} seconds`
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,339 +0,0 @@
|
||||
import { randomUUID } from 'crypto';
|
||||
import fs from 'fs';
|
||||
import readline from 'readline';
|
||||
import { glob } from 'glob';
|
||||
import Progress from 'progress';
|
||||
import { assocPath, prop, uniqBy } from 'ramda';
|
||||
|
||||
import type { IClickhouseEvent } from '@openpanel/db';
|
||||
|
||||
import { parsePath } from './copy.url';
|
||||
|
||||
const BATCH_SIZE = 8000;
|
||||
const SLEEP_TIME = 100;
|
||||
|
||||
function stripMixpanelProperties(obj: Record<string, unknown>) {
|
||||
return Object.fromEntries(
|
||||
Object.entries(obj).filter(
|
||||
([key]) =>
|
||||
!key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async function* parseJsonStream(
|
||||
fileStream: fs.ReadStream
|
||||
): AsyncGenerator<any, void, unknown> {
|
||||
const rl = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
let buffer = '';
|
||||
let bracketCount = 0;
|
||||
|
||||
for await (const line of rl) {
|
||||
buffer += line;
|
||||
bracketCount +=
|
||||
(line.match(/{/g) || []).length - (line.match(/}/g) || []).length;
|
||||
|
||||
if (bracketCount === 0 && buffer.trim()) {
|
||||
try {
|
||||
const json = JSON.parse(buffer);
|
||||
yield json;
|
||||
} catch (error) {
|
||||
console.log('Warning: Failed to parse JSON');
|
||||
console.log('Buffer:', buffer);
|
||||
}
|
||||
buffer = '';
|
||||
}
|
||||
}
|
||||
|
||||
if (buffer.trim()) {
|
||||
try {
|
||||
const json = JSON.parse(buffer);
|
||||
yield json;
|
||||
} catch (error) {
|
||||
console.log('Warning: Failed to parse remaining JSON');
|
||||
console.log('Buffer:', buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface Session {
|
||||
start: number;
|
||||
end: number;
|
||||
profileId?: string;
|
||||
deviceId?: string;
|
||||
sessionId: string;
|
||||
firstEvent?: IClickhouseEvent;
|
||||
lastEvent?: IClickhouseEvent;
|
||||
events: IClickhouseEvent[];
|
||||
}
|
||||
|
||||
function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||
let sessionList: Session[] = [];
|
||||
const lastSessionByDevice: Record<string, Session> = {};
|
||||
const lastSessionByProfile: Record<string, Session> = {};
|
||||
const thirtyMinutes = 30 * 60 * 1000;
|
||||
|
||||
events.sort(
|
||||
(a, b) =>
|
||||
new Date(a.created_at).getTime() - new Date(b.created_at).getTime()
|
||||
);
|
||||
|
||||
for (const event of events) {
|
||||
const eventTime = new Date(event.created_at).getTime();
|
||||
let deviceSession = event.device_id
|
||||
? lastSessionByDevice[event.device_id]
|
||||
: undefined;
|
||||
let profileSession = event.profile_id
|
||||
? lastSessionByProfile[event.profile_id]
|
||||
: undefined;
|
||||
|
||||
if (
|
||||
event.device_id &&
|
||||
event.device_id !== event.profile_id &&
|
||||
(!deviceSession || eventTime > deviceSession.end + thirtyMinutes)
|
||||
) {
|
||||
deviceSession = {
|
||||
start: eventTime,
|
||||
end: eventTime,
|
||||
deviceId: event.device_id,
|
||||
sessionId: randomUUID(),
|
||||
firstEvent: event,
|
||||
events: [event],
|
||||
};
|
||||
lastSessionByDevice[event.device_id] = deviceSession;
|
||||
sessionList.push(deviceSession);
|
||||
} else if (deviceSession) {
|
||||
deviceSession.end = eventTime;
|
||||
deviceSession.lastEvent = event;
|
||||
deviceSession.events.push(event);
|
||||
}
|
||||
|
||||
if (
|
||||
event.profile_id &&
|
||||
event.device_id !== event.profile_id &&
|
||||
(!profileSession || eventTime > profileSession.end + thirtyMinutes)
|
||||
) {
|
||||
profileSession = {
|
||||
start: eventTime,
|
||||
end: eventTime,
|
||||
profileId: event.profile_id,
|
||||
sessionId: randomUUID(),
|
||||
firstEvent: event,
|
||||
events: [event],
|
||||
};
|
||||
lastSessionByProfile[event.profile_id] = profileSession;
|
||||
sessionList.push(profileSession);
|
||||
} else if (profileSession) {
|
||||
profileSession.end = eventTime;
|
||||
profileSession.lastEvent = event;
|
||||
profileSession.events.push(event);
|
||||
}
|
||||
|
||||
if (
|
||||
deviceSession &&
|
||||
profileSession &&
|
||||
deviceSession.sessionId !== profileSession.sessionId
|
||||
) {
|
||||
const unifiedSession = {
|
||||
...deviceSession,
|
||||
...profileSession,
|
||||
events: [...deviceSession.events, ...profileSession.events],
|
||||
start: Math.min(deviceSession.start, profileSession.start),
|
||||
end: Math.max(deviceSession.end, profileSession.end),
|
||||
sessionId: deviceSession.sessionId,
|
||||
};
|
||||
lastSessionByDevice[event.device_id] = unifiedSession;
|
||||
lastSessionByProfile[event.profile_id] = unifiedSession;
|
||||
sessionList = sessionList.filter(
|
||||
(session) =>
|
||||
session.sessionId !== deviceSession?.sessionId &&
|
||||
session.sessionId !== profileSession?.sessionId
|
||||
);
|
||||
sessionList.push(unifiedSession);
|
||||
}
|
||||
}
|
||||
|
||||
return sessionList;
|
||||
}
|
||||
|
||||
function createEventObject(event: any): IClickhouseEvent {
|
||||
const url = parsePath(event.properties.$current_url);
|
||||
return {
|
||||
profile_id: event.properties.distinct_id
|
||||
? String(event.properties.distinct_id).replace(/^\$device:/, '')
|
||||
: event.properties.$device_id ?? '',
|
||||
name: event.event,
|
||||
created_at: new Date(event.properties.time * 1000).toISOString(),
|
||||
properties: {
|
||||
...stripMixpanelProperties(event.properties),
|
||||
...(event.properties.$current_url
|
||||
? {
|
||||
__query: url.query,
|
||||
__hash: url.hash,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
country: event.properties.country_code ?? '',
|
||||
region: event.properties.$region ?? '',
|
||||
city: event.properties.$city ?? '',
|
||||
os: event.properties.$os ?? '',
|
||||
browser: event.properties.$browser ?? '',
|
||||
browser_version: event.properties.$browser_version
|
||||
? String(event.properties.$browser_version)
|
||||
: '',
|
||||
referrer: event.properties.$initial_referrer ?? '',
|
||||
referrer_type: event.properties.$search_engine ? 'search' : '',
|
||||
referrer_name: event.properties.$search_engine ?? '',
|
||||
device_id: event.properties.$device_id ?? '',
|
||||
session_id: '',
|
||||
project_id: '',
|
||||
path: url.path,
|
||||
origin: url.origin,
|
||||
os_version: '',
|
||||
model: '',
|
||||
longitude: null,
|
||||
latitude: null,
|
||||
id: randomUUID(),
|
||||
duration: 0,
|
||||
device: event.properties.$current_url ? '' : 'server',
|
||||
brand: '',
|
||||
};
|
||||
}
|
||||
|
||||
async function processFile(file: string): Promise<IClickhouseEvent[]> {
|
||||
const fileStream = fs.createReadStream(file);
|
||||
const events: IClickhouseEvent[] = [];
|
||||
for await (const event of parseJsonStream(fileStream)) {
|
||||
if (Array.isArray(event)) {
|
||||
for (const item of event) {
|
||||
events.push(createEventObject(item));
|
||||
}
|
||||
} else {
|
||||
events.push(createEventObject(event));
|
||||
}
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
function processEvents(events: IClickhouseEvent[]): IClickhouseEvent[] {
|
||||
const sessions = generateSessionEvents(events);
|
||||
const processedEvents = sessions.flatMap((session) =>
|
||||
[
|
||||
session.firstEvent && {
|
||||
...session.firstEvent,
|
||||
id: randomUUID(),
|
||||
created_at: new Date(
|
||||
new Date(session.firstEvent.created_at).getTime() - 1000
|
||||
).toISOString(),
|
||||
session_id: session.sessionId,
|
||||
name: 'session_start',
|
||||
},
|
||||
...uniqBy(
|
||||
prop('id'),
|
||||
session.events.map((event) =>
|
||||
assocPath(['session_id'], session.sessionId, event)
|
||||
)
|
||||
),
|
||||
session.lastEvent && {
|
||||
...session.lastEvent,
|
||||
id: randomUUID(),
|
||||
created_at: new Date(
|
||||
new Date(session.lastEvent.created_at).getTime() + 1000
|
||||
).toISOString(),
|
||||
session_id: session.sessionId,
|
||||
name: 'session_end',
|
||||
},
|
||||
].filter((item): item is IClickhouseEvent => !!item)
|
||||
);
|
||||
|
||||
return [
|
||||
...processedEvents,
|
||||
...events.filter((event) => {
|
||||
return !event.profile_id && !event.device_id;
|
||||
}),
|
||||
];
|
||||
}
|
||||
|
||||
async function sendBatchToAPI(batch: IClickhouseEvent[]) {
|
||||
await fetch('http://localhost:3333/import/events', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585',
|
||||
'openpanel-client-secret': 'sec_293b903816e327e10c9d',
|
||||
},
|
||||
body: JSON.stringify(batch),
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME));
|
||||
}
|
||||
|
||||
async function processFiles(files: string[]) {
|
||||
const progress = new Progress(
|
||||
'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status',
|
||||
{
|
||||
total: files.length,
|
||||
width: 20,
|
||||
}
|
||||
);
|
||||
let savedEvents = 0;
|
||||
let currentBatch: IClickhouseEvent[] = [];
|
||||
|
||||
let apiBatching = [];
|
||||
for (const file of files) {
|
||||
progress.tick({
|
||||
file,
|
||||
savedEvents,
|
||||
status: 'reading file',
|
||||
});
|
||||
const events = await processFile(file);
|
||||
progress.render({
|
||||
file,
|
||||
savedEvents,
|
||||
status: 'processing events',
|
||||
});
|
||||
const processedEvents = processEvents(events);
|
||||
for (const event of processedEvents) {
|
||||
currentBatch.push(event);
|
||||
if (currentBatch.length >= BATCH_SIZE) {
|
||||
apiBatching.push(currentBatch);
|
||||
savedEvents += currentBatch.length;
|
||||
progress.render({ file, savedEvents, status: 'saving events' });
|
||||
currentBatch = [];
|
||||
}
|
||||
|
||||
if (apiBatching.length >= 10) {
|
||||
await Promise.all(apiBatching.map(sendBatchToAPI));
|
||||
apiBatching = [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (currentBatch.length > 0) {
|
||||
await sendBatchToAPI(currentBatch);
|
||||
savedEvents += currentBatch.length;
|
||||
progress.render({ file: 'Complete', savedEvents, status: 'Complete' });
|
||||
}
|
||||
}
|
||||
|
||||
export async function importFiles(matcher: string) {
|
||||
const files = await glob([matcher], { root: '/' });
|
||||
|
||||
if (files.length === 0) {
|
||||
console.log('No files found');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Found ${files.length} files to process`);
|
||||
|
||||
const startTime = Date.now();
|
||||
await processFiles(files);
|
||||
const endTime = Date.now();
|
||||
|
||||
console.log(
|
||||
`\nProcessing completed in ${(endTime - startTime) / 1000} seconds`
|
||||
);
|
||||
}
|
||||
@@ -1,66 +1,7 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import arg from 'arg';
|
||||
import { groupBy } from 'ramda';
|
||||
|
||||
import type { PostEventPayload } from '@openpanel/sdk';
|
||||
|
||||
import { importFiles } from './importer_v2';
|
||||
|
||||
const BATCH_SIZE = 10000; // Define your batch size
|
||||
const SLEEP_TIME = 100; // Define your sleep time between batches
|
||||
|
||||
function progress(value: string) {
|
||||
process.stdout.clearLine(0);
|
||||
process.stdout.cursorTo(0);
|
||||
process.stdout.write(value);
|
||||
}
|
||||
|
||||
function stripMixpanelProperties(obj: Record<string, unknown>) {
|
||||
const properties = ['time', 'distinct_id'];
|
||||
const result: Record<string, unknown> = {};
|
||||
for (const key in obj) {
|
||||
if (key.match(/^(\$|mp_)/) || properties.includes(key)) {
|
||||
continue;
|
||||
}
|
||||
result[key] = obj[key];
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function safeParse(json: string) {
|
||||
try {
|
||||
return JSON.parse(json);
|
||||
} catch (error) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function parseFileContent(fileContent: string): {
|
||||
event: string;
|
||||
properties: {
|
||||
time: number;
|
||||
distinct_id?: string | number;
|
||||
[key: string]: unknown;
|
||||
};
|
||||
}[] {
|
||||
try {
|
||||
return JSON.parse(fileContent);
|
||||
} catch (error) {
|
||||
const lines = fileContent.trim().split('\n');
|
||||
return lines
|
||||
.map((line, index) => {
|
||||
const json = safeParse(line);
|
||||
if (!json) {
|
||||
console.log('Warning: Failed to parse JSON');
|
||||
console.log('Index:', index);
|
||||
console.log('Line:', line);
|
||||
}
|
||||
return json;
|
||||
})
|
||||
.filter(Boolean);
|
||||
}
|
||||
}
|
||||
import { importFiles } from './importer';
|
||||
|
||||
export default function importer() {
|
||||
const args = arg(
|
||||
|
||||
Reference in New Issue
Block a user