wip
This commit is contained in:
committed by
Carl-Gerhard Lindesvärd
parent
3129f62c14
commit
bf0c14cc88
@@ -15,6 +15,8 @@
|
|||||||
"arg": "^5.0.2",
|
"arg": "^5.0.2",
|
||||||
"glob": "^10.4.3",
|
"glob": "^10.4.3",
|
||||||
"inquirer": "^9.3.5",
|
"inquirer": "^9.3.5",
|
||||||
|
"p-limit": "^6.1.0",
|
||||||
|
"progress": "^2.0.3",
|
||||||
"ramda": "^0.29.1",
|
"ramda": "^0.29.1",
|
||||||
"zod": "^3.22.4"
|
"zod": "^3.22.4"
|
||||||
},
|
},
|
||||||
@@ -25,6 +27,7 @@
|
|||||||
"@openpanel/sdk": "workspace:*",
|
"@openpanel/sdk": "workspace:*",
|
||||||
"@openpanel/tsconfig": "workspace:*",
|
"@openpanel/tsconfig": "workspace:*",
|
||||||
"@types/node": "^20.14.10",
|
"@types/node": "^20.14.10",
|
||||||
|
"@types/progress": "^2.0.7",
|
||||||
"@types/ramda": "^0.30.1",
|
"@types/ramda": "^0.30.1",
|
||||||
"eslint": "^8.48.0",
|
"eslint": "^8.48.0",
|
||||||
"prettier": "^3.0.3",
|
"prettier": "^3.0.3",
|
||||||
|
|||||||
@@ -12,8 +12,6 @@ function cli() {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
console.log('cli args', args);
|
|
||||||
|
|
||||||
const [command] = args._;
|
const [command] = args._;
|
||||||
|
|
||||||
switch (command) {
|
switch (command) {
|
||||||
|
|||||||
@@ -1,9 +1,7 @@
|
|||||||
import { randomUUID } from 'crypto';
|
import { randomUUID } from 'crypto';
|
||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import os from 'os';
|
|
||||||
import v8 from 'v8';
|
|
||||||
import { glob } from 'glob';
|
import { glob } from 'glob';
|
||||||
import { assocPath, clone, last, prop, uniqBy } from 'ramda';
|
import { assocPath, clone, prop, uniqBy } from 'ramda';
|
||||||
|
|
||||||
import type { IClickhouseEvent } from '@openpanel/db';
|
import type { IClickhouseEvent } from '@openpanel/db';
|
||||||
|
|
||||||
@@ -12,29 +10,6 @@ import { parsePath } from './copy.url';
|
|||||||
const BATCH_SIZE = 8000; // Define your batch size
|
const BATCH_SIZE = 8000; // Define your batch size
|
||||||
const SLEEP_TIME = 100; // Define your sleep time between batches
|
const SLEEP_TIME = 100; // Define your sleep time between batches
|
||||||
|
|
||||||
function checkNodeMemoryLimit() {
|
|
||||||
const totalHeapSize = v8.getHeapStatistics().total_available_size;
|
|
||||||
return totalHeapSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
function checkMemoryUsage() {
|
|
||||||
const _totalMemory = os.totalmem();
|
|
||||||
const totalMemory = checkNodeMemoryLimit();
|
|
||||||
console.log({
|
|
||||||
totalMemory,
|
|
||||||
_totalMemory,
|
|
||||||
});
|
|
||||||
|
|
||||||
const usedMemory = process.memoryUsage().heapUsed;
|
|
||||||
const percentUsed = (usedMemory / totalMemory) * 100;
|
|
||||||
return percentUsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
function shouldLoadMoreFiles() {
|
|
||||||
const percentUsed = checkMemoryUsage();
|
|
||||||
return percentUsed < 80; // or any other threshold you deem appropriate
|
|
||||||
}
|
|
||||||
|
|
||||||
function progress(value: string) {
|
function progress(value: string) {
|
||||||
process.stdout.clearLine(0);
|
process.stdout.clearLine(0);
|
||||||
process.stdout.cursorTo(0);
|
process.stdout.cursorTo(0);
|
||||||
@@ -204,37 +179,6 @@ function generateSessionId(): string {
|
|||||||
return randomUUID();
|
return randomUUID();
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function loadFilesBatcher() {
|
|
||||||
const files = await glob(['../../../../Downloads/mp-data/*.txt'], {
|
|
||||||
root: '/',
|
|
||||||
});
|
|
||||||
|
|
||||||
function chunks(array: string[], size: number) {
|
|
||||||
const results = [];
|
|
||||||
while (array.length) {
|
|
||||||
results.push(array.splice(0, size));
|
|
||||||
}
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
const times = [];
|
|
||||||
const chunksArray = chunks(files, 5);
|
|
||||||
let chunkIndex = 0;
|
|
||||||
for (const chunk of chunksArray.slice(0, 1)) {
|
|
||||||
if (times.length > 0) {
|
|
||||||
// Print out how much time is approximately left
|
|
||||||
const average = times.reduce((a, b) => a + b) / times.length;
|
|
||||||
const remaining = (chunksArray.length - chunkIndex) * average;
|
|
||||||
console.log(`Estimated time left: ${remaining / 1000 / 60} minutes`);
|
|
||||||
}
|
|
||||||
console.log('Processing chunk:', chunkIndex);
|
|
||||||
chunkIndex++;
|
|
||||||
const d = Date.now();
|
|
||||||
await loadFiles(chunk);
|
|
||||||
times.push(Date.now() - d);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function loadFiles(files: string[] = []) {
|
async function loadFiles(files: string[] = []) {
|
||||||
const data: any[] = [];
|
const data: any[] = [];
|
||||||
const filesToParse = files.slice(0, 10);
|
const filesToParse = files.slice(0, 10);
|
||||||
@@ -245,7 +189,6 @@ async function loadFiles(files: string[] = []) {
|
|||||||
const content: any[] = [];
|
const content: any[] = [];
|
||||||
|
|
||||||
readStream.on('data', (chunk) => {
|
readStream.on('data', (chunk) => {
|
||||||
// console.log(`Received ${chunk.length} bytes of data.`);
|
|
||||||
content.push(chunk.toString('utf-8'));
|
content.push(chunk.toString('utf-8'));
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -324,13 +267,6 @@ async function loadFiles(files: string[] = []) {
|
|||||||
const sessions = generateSessionEvents(a);
|
const sessions = generateSessionEvents(a);
|
||||||
|
|
||||||
const events = sessions.flatMap((session) => {
|
const events = sessions.flatMap((session) => {
|
||||||
if (
|
|
||||||
session.profileId === '594447' ||
|
|
||||||
session.deviceId ===
|
|
||||||
'19081f09f2d666-082ba152fdf7548-7f7a3660-5a900-19081f09f2d666'
|
|
||||||
) {
|
|
||||||
console.log(session);
|
|
||||||
}
|
|
||||||
return [
|
return [
|
||||||
session.firstEvent && {
|
session.firstEvent && {
|
||||||
...session.firstEvent,
|
...session.firstEvent,
|
||||||
@@ -407,7 +343,6 @@ async function loadFiles(files: string[] = []) {
|
|||||||
await Promise.all(promises);
|
await Promise.all(promises);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
console.log(totalPages);
|
|
||||||
|
|
||||||
// Trigger the batches
|
// Trigger the batches
|
||||||
try {
|
try {
|
||||||
@@ -417,4 +352,38 @@ async function loadFiles(files: string[] = []) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loadFilesBatcher();
|
export async function importFiles(matcher: string) {
|
||||||
|
const files = await glob([matcher], {
|
||||||
|
root: '/',
|
||||||
|
});
|
||||||
|
|
||||||
|
if (files.length === 0) {
|
||||||
|
console.log('No files found');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
function chunks(array: string[], size: number) {
|
||||||
|
const results = [];
|
||||||
|
while (array.length) {
|
||||||
|
results.push(array.splice(0, size));
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
const times = [];
|
||||||
|
const chunksArray = chunks(files, 3);
|
||||||
|
let chunkIndex = 0;
|
||||||
|
for (const chunk of chunksArray) {
|
||||||
|
if (times.length > 0) {
|
||||||
|
// Print out how much time is approximately left
|
||||||
|
const average = times.reduce((a, b) => a + b) / times.length;
|
||||||
|
const remaining = (chunksArray.length - chunkIndex) * average;
|
||||||
|
console.log(`\n\nEstimated time left: ${remaining / 1000 / 60} minutes`);
|
||||||
|
}
|
||||||
|
console.log('Processing chunk:', chunkIndex);
|
||||||
|
chunkIndex++;
|
||||||
|
const d = Date.now();
|
||||||
|
await loadFiles(chunk);
|
||||||
|
times.push(Date.now() - d);
|
||||||
|
}
|
||||||
|
}
|
||||||
339
packages/cli/src/importer/importer_v2.ts
Normal file
339
packages/cli/src/importer/importer_v2.ts
Normal file
@@ -0,0 +1,339 @@
|
|||||||
|
import { randomUUID } from 'crypto';
|
||||||
|
import fs from 'fs';
|
||||||
|
import readline from 'readline';
|
||||||
|
import { glob } from 'glob';
|
||||||
|
import Progress from 'progress';
|
||||||
|
import { assocPath, prop, uniqBy } from 'ramda';
|
||||||
|
|
||||||
|
import type { IClickhouseEvent } from '@openpanel/db';
|
||||||
|
|
||||||
|
import { parsePath } from './copy.url';
|
||||||
|
|
||||||
|
const BATCH_SIZE = 8000;
|
||||||
|
const SLEEP_TIME = 100;
|
||||||
|
|
||||||
|
function stripMixpanelProperties(obj: Record<string, unknown>) {
|
||||||
|
return Object.fromEntries(
|
||||||
|
Object.entries(obj).filter(
|
||||||
|
([key]) =>
|
||||||
|
!key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function* parseJsonStream(
|
||||||
|
fileStream: fs.ReadStream
|
||||||
|
): AsyncGenerator<any, void, unknown> {
|
||||||
|
const rl = readline.createInterface({
|
||||||
|
input: fileStream,
|
||||||
|
crlfDelay: Infinity,
|
||||||
|
});
|
||||||
|
|
||||||
|
let buffer = '';
|
||||||
|
let bracketCount = 0;
|
||||||
|
|
||||||
|
for await (const line of rl) {
|
||||||
|
buffer += line;
|
||||||
|
bracketCount +=
|
||||||
|
(line.match(/{/g) || []).length - (line.match(/}/g) || []).length;
|
||||||
|
|
||||||
|
if (bracketCount === 0 && buffer.trim()) {
|
||||||
|
try {
|
||||||
|
const json = JSON.parse(buffer);
|
||||||
|
yield json;
|
||||||
|
} catch (error) {
|
||||||
|
console.log('Warning: Failed to parse JSON');
|
||||||
|
console.log('Buffer:', buffer);
|
||||||
|
}
|
||||||
|
buffer = '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buffer.trim()) {
|
||||||
|
try {
|
||||||
|
const json = JSON.parse(buffer);
|
||||||
|
yield json;
|
||||||
|
} catch (error) {
|
||||||
|
console.log('Warning: Failed to parse remaining JSON');
|
||||||
|
console.log('Buffer:', buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Session {
|
||||||
|
start: number;
|
||||||
|
end: number;
|
||||||
|
profileId?: string;
|
||||||
|
deviceId?: string;
|
||||||
|
sessionId: string;
|
||||||
|
firstEvent?: IClickhouseEvent;
|
||||||
|
lastEvent?: IClickhouseEvent;
|
||||||
|
events: IClickhouseEvent[];
|
||||||
|
}
|
||||||
|
|
||||||
|
function generateSessionEvents(events: IClickhouseEvent[]): Session[] {
|
||||||
|
let sessionList: Session[] = [];
|
||||||
|
const lastSessionByDevice: Record<string, Session> = {};
|
||||||
|
const lastSessionByProfile: Record<string, Session> = {};
|
||||||
|
const thirtyMinutes = 30 * 60 * 1000;
|
||||||
|
|
||||||
|
events.sort(
|
||||||
|
(a, b) =>
|
||||||
|
new Date(a.created_at).getTime() - new Date(b.created_at).getTime()
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
const eventTime = new Date(event.created_at).getTime();
|
||||||
|
let deviceSession = event.device_id
|
||||||
|
? lastSessionByDevice[event.device_id]
|
||||||
|
: undefined;
|
||||||
|
let profileSession = event.profile_id
|
||||||
|
? lastSessionByProfile[event.profile_id]
|
||||||
|
: undefined;
|
||||||
|
|
||||||
|
if (
|
||||||
|
event.device_id &&
|
||||||
|
event.device_id !== event.profile_id &&
|
||||||
|
(!deviceSession || eventTime > deviceSession.end + thirtyMinutes)
|
||||||
|
) {
|
||||||
|
deviceSession = {
|
||||||
|
start: eventTime,
|
||||||
|
end: eventTime,
|
||||||
|
deviceId: event.device_id,
|
||||||
|
sessionId: randomUUID(),
|
||||||
|
firstEvent: event,
|
||||||
|
events: [event],
|
||||||
|
};
|
||||||
|
lastSessionByDevice[event.device_id] = deviceSession;
|
||||||
|
sessionList.push(deviceSession);
|
||||||
|
} else if (deviceSession) {
|
||||||
|
deviceSession.end = eventTime;
|
||||||
|
deviceSession.lastEvent = event;
|
||||||
|
deviceSession.events.push(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
event.profile_id &&
|
||||||
|
event.device_id !== event.profile_id &&
|
||||||
|
(!profileSession || eventTime > profileSession.end + thirtyMinutes)
|
||||||
|
) {
|
||||||
|
profileSession = {
|
||||||
|
start: eventTime,
|
||||||
|
end: eventTime,
|
||||||
|
profileId: event.profile_id,
|
||||||
|
sessionId: randomUUID(),
|
||||||
|
firstEvent: event,
|
||||||
|
events: [event],
|
||||||
|
};
|
||||||
|
lastSessionByProfile[event.profile_id] = profileSession;
|
||||||
|
sessionList.push(profileSession);
|
||||||
|
} else if (profileSession) {
|
||||||
|
profileSession.end = eventTime;
|
||||||
|
profileSession.lastEvent = event;
|
||||||
|
profileSession.events.push(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
deviceSession &&
|
||||||
|
profileSession &&
|
||||||
|
deviceSession.sessionId !== profileSession.sessionId
|
||||||
|
) {
|
||||||
|
const unifiedSession = {
|
||||||
|
...deviceSession,
|
||||||
|
...profileSession,
|
||||||
|
events: [...deviceSession.events, ...profileSession.events],
|
||||||
|
start: Math.min(deviceSession.start, profileSession.start),
|
||||||
|
end: Math.max(deviceSession.end, profileSession.end),
|
||||||
|
sessionId: deviceSession.sessionId,
|
||||||
|
};
|
||||||
|
lastSessionByDevice[event.device_id] = unifiedSession;
|
||||||
|
lastSessionByProfile[event.profile_id] = unifiedSession;
|
||||||
|
sessionList = sessionList.filter(
|
||||||
|
(session) =>
|
||||||
|
session.sessionId !== deviceSession?.sessionId &&
|
||||||
|
session.sessionId !== profileSession?.sessionId
|
||||||
|
);
|
||||||
|
sessionList.push(unifiedSession);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
function createEventObject(event: any): IClickhouseEvent {
|
||||||
|
const url = parsePath(event.properties.$current_url);
|
||||||
|
return {
|
||||||
|
profile_id: event.properties.distinct_id
|
||||||
|
? String(event.properties.distinct_id).replace(/^\$device:/, '')
|
||||||
|
: event.properties.$device_id ?? '',
|
||||||
|
name: event.event,
|
||||||
|
created_at: new Date(event.properties.time * 1000).toISOString(),
|
||||||
|
properties: {
|
||||||
|
...stripMixpanelProperties(event.properties),
|
||||||
|
...(event.properties.$current_url
|
||||||
|
? {
|
||||||
|
__query: url.query,
|
||||||
|
__hash: url.hash,
|
||||||
|
}
|
||||||
|
: {}),
|
||||||
|
},
|
||||||
|
country: event.properties.country_code ?? '',
|
||||||
|
region: event.properties.$region ?? '',
|
||||||
|
city: event.properties.$city ?? '',
|
||||||
|
os: event.properties.$os ?? '',
|
||||||
|
browser: event.properties.$browser ?? '',
|
||||||
|
browser_version: event.properties.$browser_version
|
||||||
|
? String(event.properties.$browser_version)
|
||||||
|
: '',
|
||||||
|
referrer: event.properties.$initial_referrer ?? '',
|
||||||
|
referrer_type: event.properties.$search_engine ? 'search' : '',
|
||||||
|
referrer_name: event.properties.$search_engine ?? '',
|
||||||
|
device_id: event.properties.$device_id ?? '',
|
||||||
|
session_id: '',
|
||||||
|
project_id: '',
|
||||||
|
path: url.path,
|
||||||
|
origin: url.origin,
|
||||||
|
os_version: '',
|
||||||
|
model: '',
|
||||||
|
longitude: null,
|
||||||
|
latitude: null,
|
||||||
|
id: randomUUID(),
|
||||||
|
duration: 0,
|
||||||
|
device: event.properties.$current_url ? '' : 'server',
|
||||||
|
brand: '',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processFile(file: string): Promise<IClickhouseEvent[]> {
|
||||||
|
const fileStream = fs.createReadStream(file);
|
||||||
|
const events: IClickhouseEvent[] = [];
|
||||||
|
for await (const event of parseJsonStream(fileStream)) {
|
||||||
|
if (Array.isArray(event)) {
|
||||||
|
for (const item of event) {
|
||||||
|
events.push(createEventObject(item));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
events.push(createEventObject(event));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
function processEvents(events: IClickhouseEvent[]): IClickhouseEvent[] {
|
||||||
|
const sessions = generateSessionEvents(events);
|
||||||
|
const processedEvents = sessions.flatMap((session) =>
|
||||||
|
[
|
||||||
|
session.firstEvent && {
|
||||||
|
...session.firstEvent,
|
||||||
|
id: randomUUID(),
|
||||||
|
created_at: new Date(
|
||||||
|
new Date(session.firstEvent.created_at).getTime() - 1000
|
||||||
|
).toISOString(),
|
||||||
|
session_id: session.sessionId,
|
||||||
|
name: 'session_start',
|
||||||
|
},
|
||||||
|
...uniqBy(
|
||||||
|
prop('id'),
|
||||||
|
session.events.map((event) =>
|
||||||
|
assocPath(['session_id'], session.sessionId, event)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
session.lastEvent && {
|
||||||
|
...session.lastEvent,
|
||||||
|
id: randomUUID(),
|
||||||
|
created_at: new Date(
|
||||||
|
new Date(session.lastEvent.created_at).getTime() + 1000
|
||||||
|
).toISOString(),
|
||||||
|
session_id: session.sessionId,
|
||||||
|
name: 'session_end',
|
||||||
|
},
|
||||||
|
].filter((item): item is IClickhouseEvent => !!item)
|
||||||
|
);
|
||||||
|
|
||||||
|
return [
|
||||||
|
...processedEvents,
|
||||||
|
...events.filter((event) => {
|
||||||
|
return !event.profile_id && !event.device_id;
|
||||||
|
}),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function sendBatchToAPI(batch: IClickhouseEvent[]) {
|
||||||
|
await fetch('http://localhost:3333/import/events', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585',
|
||||||
|
'openpanel-client-secret': 'sec_293b903816e327e10c9d',
|
||||||
|
},
|
||||||
|
body: JSON.stringify(batch),
|
||||||
|
});
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processFiles(files: string[]) {
|
||||||
|
const progress = new Progress(
|
||||||
|
'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status',
|
||||||
|
{
|
||||||
|
total: files.length,
|
||||||
|
width: 20,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let savedEvents = 0;
|
||||||
|
let currentBatch: IClickhouseEvent[] = [];
|
||||||
|
|
||||||
|
let apiBatching = [];
|
||||||
|
for (const file of files) {
|
||||||
|
progress.tick({
|
||||||
|
file,
|
||||||
|
savedEvents,
|
||||||
|
status: 'reading file',
|
||||||
|
});
|
||||||
|
const events = await processFile(file);
|
||||||
|
progress.render({
|
||||||
|
file,
|
||||||
|
savedEvents,
|
||||||
|
status: 'processing events',
|
||||||
|
});
|
||||||
|
const processedEvents = processEvents(events);
|
||||||
|
for (const event of processedEvents) {
|
||||||
|
currentBatch.push(event);
|
||||||
|
if (currentBatch.length >= BATCH_SIZE) {
|
||||||
|
apiBatching.push(currentBatch);
|
||||||
|
savedEvents += currentBatch.length;
|
||||||
|
progress.render({ file, savedEvents, status: 'saving events' });
|
||||||
|
currentBatch = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (apiBatching.length >= 10) {
|
||||||
|
await Promise.all(apiBatching.map(sendBatchToAPI));
|
||||||
|
apiBatching = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currentBatch.length > 0) {
|
||||||
|
await sendBatchToAPI(currentBatch);
|
||||||
|
savedEvents += currentBatch.length;
|
||||||
|
progress.render({ file: 'Complete', savedEvents, status: 'Complete' });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function importFiles(matcher: string) {
|
||||||
|
const files = await glob([matcher], { root: '/' });
|
||||||
|
|
||||||
|
if (files.length === 0) {
|
||||||
|
console.log('No files found');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Found ${files.length} files to process`);
|
||||||
|
|
||||||
|
const startTime = Date.now();
|
||||||
|
await processFiles(files);
|
||||||
|
const endTime = Date.now();
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`\nProcessing completed in ${(endTime - startTime) / 1000} seconds`
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -5,6 +5,8 @@ import { groupBy } from 'ramda';
|
|||||||
|
|
||||||
import type { PostEventPayload } from '@openpanel/sdk';
|
import type { PostEventPayload } from '@openpanel/sdk';
|
||||||
|
|
||||||
|
import { importFiles } from './importer_v2';
|
||||||
|
|
||||||
const BATCH_SIZE = 10000; // Define your batch size
|
const BATCH_SIZE = 10000; // Define your batch size
|
||||||
const SLEEP_TIME = 100; // Define your sleep time between batches
|
const SLEEP_TIME = 100; // Define your sleep time between batches
|
||||||
|
|
||||||
@@ -63,137 +65,20 @@ function parseFileContent(fileContent: string): {
|
|||||||
export default function importer() {
|
export default function importer() {
|
||||||
const args = arg(
|
const args = arg(
|
||||||
{
|
{
|
||||||
'--file': String,
|
'--glob': String,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
permissive: true,
|
permissive: true,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!args['--file']) {
|
if (!args['--glob']) {
|
||||||
throw new Error('Missing --file argument');
|
throw new Error('Missing --glob argument');
|
||||||
}
|
}
|
||||||
|
|
||||||
const cwd = process.cwd();
|
const cwd = process.cwd();
|
||||||
|
|
||||||
const filePath = path.resolve(cwd, args['--file']);
|
const filePath = path.resolve(cwd, args['--glob']);
|
||||||
const fileContent = parseFileContent(fs.readFileSync(filePath, 'utf-8'));
|
|
||||||
|
|
||||||
// const groups = groupBy((event) => event.properties.$device_id, fileContent);
|
return importFiles(filePath);
|
||||||
// const groupEntries = Object.entries(groups);
|
|
||||||
|
|
||||||
// const profiles = new Map<string, any[]>();
|
|
||||||
|
|
||||||
// for (const [deviceId, items] of Object.entries(groups)) {
|
|
||||||
// items.forEach((item) => {
|
|
||||||
// if (item.properties.distinct_id) {
|
|
||||||
// if (!profiles.has(item.properties.distinct_id)) {
|
|
||||||
// profiles.set(item.properties.distinct_id, []);
|
|
||||||
// }
|
|
||||||
// profiles.get(item.properties.distinct_id)!.push(item);
|
|
||||||
// } else {
|
|
||||||
// item.properties.$device_id
|
|
||||||
// }
|
|
||||||
// })
|
|
||||||
// profiles.
|
|
||||||
// }
|
|
||||||
// console.log('Total:', groupEntries.length);
|
|
||||||
// console.log('Undefined:', groups.undefined?.length ?? 0);
|
|
||||||
|
|
||||||
// const uniqueKeys = new Set<string>();
|
|
||||||
// groups.undefined.forEach((event) => {
|
|
||||||
// if (event.properties.distinct_id) {
|
|
||||||
// console.log(event);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
|
|
||||||
// 1: group by device id
|
|
||||||
// 2: add session start, session end and populate session_id
|
|
||||||
// 3: check if distinct_id exists on any event
|
|
||||||
// - If it does, get all events with that distinct_id and NO device_id and within session_start and session_end
|
|
||||||
// - add add the session_id to those events
|
|
||||||
// 4: send all events to the server
|
|
||||||
|
|
||||||
const events: PostEventPayload[] = fileContent
|
|
||||||
.slice()
|
|
||||||
.reverse()
|
|
||||||
.map((event) => {
|
|
||||||
if (event.properties.mp_lib === 'web') {
|
|
||||||
console.log(event);
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
profileId: event.properties.distinct_id
|
|
||||||
? String(event.properties.distinct_id)
|
|
||||||
: undefined,
|
|
||||||
name: event.event,
|
|
||||||
timestamp: new Date(event.properties.time * 1000).toISOString(),
|
|
||||||
properties: {
|
|
||||||
__country: event.properties.country_code,
|
|
||||||
__region: event.properties.$region,
|
|
||||||
__city: event.properties.$city,
|
|
||||||
__os: event.properties.$os,
|
|
||||||
__browser: event.properties.$browser,
|
|
||||||
__browser_version: event.properties.$browser_version,
|
|
||||||
__referrer: event.properties.$referrer,
|
|
||||||
__device_id: event.properties.$device_id,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
const totalPages = Math.ceil(events.length / BATCH_SIZE);
|
|
||||||
const estimatedTime = (totalPages / 8) * SLEEP_TIME + (totalPages / 8) * 80;
|
|
||||||
console.log(`Estimated time: ${estimatedTime / 1000} seconds`);
|
|
||||||
|
|
||||||
async function batcher(page: number) {
|
|
||||||
const batch = events.slice(page * BATCH_SIZE, (page + 1) * BATCH_SIZE);
|
|
||||||
|
|
||||||
if (batch.length === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const size = Buffer.byteLength(JSON.stringify(batch));
|
|
||||||
console.log(batch.length, size / (1024 * 1024));
|
|
||||||
|
|
||||||
// await fetch('http://localhost:3333/import/events', {
|
|
||||||
// method: 'POST',
|
|
||||||
// headers: {
|
|
||||||
// 'Content-Type': 'application/json',
|
|
||||||
// 'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585',
|
|
||||||
// 'openpanel-client-secret': 'sec_293b903816e327e10c9d',
|
|
||||||
// },
|
|
||||||
// body: JSON.stringify(batch),
|
|
||||||
// });
|
|
||||||
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME));
|
|
||||||
}
|
|
||||||
|
|
||||||
async function runBatchesInParallel(
|
|
||||||
totalPages: number,
|
|
||||||
concurrentBatches: number
|
|
||||||
) {
|
|
||||||
let currentPage = 0;
|
|
||||||
|
|
||||||
while (currentPage < totalPages) {
|
|
||||||
const promises = [];
|
|
||||||
for (
|
|
||||||
let i = 0;
|
|
||||||
i < concurrentBatches && currentPage < totalPages;
|
|
||||||
i++, currentPage++
|
|
||||||
) {
|
|
||||||
console.log(`Sending batch ${currentPage}... %)`);
|
|
||||||
promises.push(batcher(currentPage));
|
|
||||||
}
|
|
||||||
await Promise.all(promises);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
console.log(totalPages);
|
|
||||||
|
|
||||||
// Trigger the batches
|
|
||||||
try {
|
|
||||||
runBatchesInParallel(totalPages, 8); // Run 8 batches in parallel
|
|
||||||
} catch (e) {
|
|
||||||
console.log('ERROR?!', e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|||||||
27
pnpm-lock.yaml
generated
27
pnpm-lock.yaml
generated
@@ -828,6 +828,12 @@ importers:
|
|||||||
inquirer:
|
inquirer:
|
||||||
specifier: ^9.3.5
|
specifier: ^9.3.5
|
||||||
version: 9.3.6
|
version: 9.3.6
|
||||||
|
p-limit:
|
||||||
|
specifier: ^6.1.0
|
||||||
|
version: 6.1.0
|
||||||
|
progress:
|
||||||
|
specifier: ^2.0.3
|
||||||
|
version: 2.0.3
|
||||||
ramda:
|
ramda:
|
||||||
specifier: ^0.29.1
|
specifier: ^0.29.1
|
||||||
version: 0.29.1
|
version: 0.29.1
|
||||||
@@ -853,6 +859,9 @@ importers:
|
|||||||
'@types/node':
|
'@types/node':
|
||||||
specifier: ^20.14.10
|
specifier: ^20.14.10
|
||||||
version: 20.14.11
|
version: 20.14.11
|
||||||
|
'@types/progress':
|
||||||
|
specifier: ^2.0.7
|
||||||
|
version: 2.0.7
|
||||||
'@types/ramda':
|
'@types/ramda':
|
||||||
specifier: ^0.30.1
|
specifier: ^0.30.1
|
||||||
version: 0.30.1
|
version: 0.30.1
|
||||||
@@ -7632,6 +7641,12 @@ packages:
|
|||||||
resolution: {integrity: sha512-k7kRA033QNtC+gLc4VPlfnue58CM1iQLgn1IMAU8VPHGOj7oIHPp9UlhedEnD/Gl8evoCjwkZjlBORtZ3JByUA==}
|
resolution: {integrity: sha512-k7kRA033QNtC+gLc4VPlfnue58CM1iQLgn1IMAU8VPHGOj7oIHPp9UlhedEnD/Gl8evoCjwkZjlBORtZ3JByUA==}
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/@types/progress@2.0.7:
|
||||||
|
resolution: {integrity: sha512-iadjw02vte8qWx7U0YM++EybBha2CQLPGu9iJ97whVgJUT5Zq9MjAPYUnbfRI2Kpehimf1QjFJYxD0t8nqzu5w==}
|
||||||
|
dependencies:
|
||||||
|
'@types/node': 18.19.17
|
||||||
|
dev: true
|
||||||
|
|
||||||
/@types/prop-types@15.7.11:
|
/@types/prop-types@15.7.11:
|
||||||
resolution: {integrity: sha512-ga8y9v9uyeiLdpKddhxYQkxNDrfvuPrlFb0N1qnZZByvcElJaXthF1UhvCh9TLWJBEHeNtdnbysW7Y6Uq8CVng==}
|
resolution: {integrity: sha512-ga8y9v9uyeiLdpKddhxYQkxNDrfvuPrlFb0N1qnZZByvcElJaXthF1UhvCh9TLWJBEHeNtdnbysW7Y6Uq8CVng==}
|
||||||
|
|
||||||
@@ -15251,6 +15266,13 @@ packages:
|
|||||||
dependencies:
|
dependencies:
|
||||||
yocto-queue: 0.1.0
|
yocto-queue: 0.1.0
|
||||||
|
|
||||||
|
/p-limit@6.1.0:
|
||||||
|
resolution: {integrity: sha512-H0jc0q1vOzlEk0TqAKXKZxdl7kX3OFUzCnNVUnq5Pc3DGo0kpeaMuPqxQn235HibwBEb0/pm9dgKTjXy66fBkg==}
|
||||||
|
engines: {node: '>=18'}
|
||||||
|
dependencies:
|
||||||
|
yocto-queue: 1.1.1
|
||||||
|
dev: false
|
||||||
|
|
||||||
/p-locate@3.0.0:
|
/p-locate@3.0.0:
|
||||||
resolution: {integrity: sha512-x+12w/To+4GFfgJhBEpiDcLozRJGegY+Ei7/z0tSLkMmxGZNybVMSfWj9aJn8Z5Fc7dBUNJOOVgPv2H7IwulSQ==}
|
resolution: {integrity: sha512-x+12w/To+4GFfgJhBEpiDcLozRJGegY+Ei7/z0tSLkMmxGZNybVMSfWj9aJn8Z5Fc7dBUNJOOVgPv2H7IwulSQ==}
|
||||||
engines: {node: '>=6'}
|
engines: {node: '>=6'}
|
||||||
@@ -18869,6 +18891,11 @@ packages:
|
|||||||
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
|
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
|
||||||
engines: {node: '>=10'}
|
engines: {node: '>=10'}
|
||||||
|
|
||||||
|
/yocto-queue@1.1.1:
|
||||||
|
resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==}
|
||||||
|
engines: {node: '>=12.20'}
|
||||||
|
dev: false
|
||||||
|
|
||||||
/yoctocolors-cjs@2.1.2:
|
/yoctocolors-cjs@2.1.2:
|
||||||
resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==}
|
resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==}
|
||||||
engines: {node: '>=18'}
|
engines: {node: '>=18'}
|
||||||
|
|||||||
Reference in New Issue
Block a user