diff --git a/apps/api/src/controllers/import.controller.ts b/apps/api/src/controllers/import.controller.ts index 17d393ac..47f2f900 100644 --- a/apps/api/src/controllers/import.controller.ts +++ b/apps/api/src/controllers/import.controller.ts @@ -1,14 +1,8 @@ import type { FastifyReply, FastifyRequest } from 'fastify'; -import { pathOr } from 'ramda'; -import { v4 as uuid } from 'uuid'; import { toDots } from '@openpanel/common'; -import type { - IClickhouseEvent, - IServiceCreateEventPayload, -} from '@openpanel/db'; -import { ch, formatClickhouseDate } from '@openpanel/db'; -import type { PostEventPayload } from '@openpanel/sdk'; +import type { IClickhouseEvent } from '@openpanel/db'; +import { ch, formatClickhouseDate, TABLE_NAMES } from '@openpanel/db'; export async function importEvents( request: FastifyRequest<{ @@ -16,24 +10,31 @@ export async function importEvents( }>, reply: FastifyReply ) { - console.log('HERE?!', request.body.length); - + const importedAt = formatClickhouseDate(new Date()); const values: IClickhouseEvent[] = request.body.map((event) => { return { ...event, + properties: toDots(event.properties), project_id: request.client?.projectId ?? '', created_at: formatClickhouseDate(event.created_at), + imported_at: importedAt, }; }); - const res = await ch.insert({ - table: 'events', - values, - format: 'JSONEachRow', - clickhouse_settings: { - date_time_input_format: 'best_effort', - }, - }); + try { + const res = await ch.insert({ + table: TABLE_NAMES.events, + values, + format: 'JSONEachRow', + clickhouse_settings: { + date_time_input_format: 'best_effort', + }, + }); - reply.send('OK'); + console.log(res.summary?.written_rows, 'events imported'); + reply.send('OK'); + } catch (e) { + console.error(e); + reply.status(500).send('Error'); + } } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index f7a98426..1fb869d4 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -16,6 +16,7 @@ import { appRouter, createContext } from '@openpanel/trpc'; import eventRouter from './routes/event.router'; import exportRouter from './routes/export.router'; +import importRouter from './routes/import.router'; import liveRouter from './routes/live.router'; import miscRouter from './routes/misc.router'; import profileRouter from './routes/profile.router'; @@ -91,6 +92,7 @@ const startServer = async () => { fastify.register(miscRouter, { prefix: '/misc' }); fastify.register(exportRouter, { prefix: '/export' }); fastify.register(webhookRouter, { prefix: '/webhook' }); + fastify.register(importRouter, { prefix: '/import' }); fastify.setErrorHandler((error) => { logger.error(error, 'Error in request'); }); diff --git a/apps/dashboard/src/components/overview/useOverviewOptions.ts b/apps/dashboard/src/components/overview/useOverviewOptions.ts index 9d19e590..89e226ea 100644 --- a/apps/dashboard/src/components/overview/useOverviewOptions.ts +++ b/apps/dashboard/src/components/overview/useOverviewOptions.ts @@ -1,6 +1,5 @@ -import { useEffect } from 'react'; +import { differenceInCalendarMonths } from 'date-fns'; import { - parseAsBoolean, parseAsInteger, parseAsString, parseAsStringEnum, @@ -18,10 +17,6 @@ import { mapKeys } from '@openpanel/validation'; const nuqsOptions = { history: 'push' } as const; export function useOverviewOptions() { - const [previous, setPrevious] = useQueryState( - 'compare', - parseAsBoolean.withDefault(true).withOptions(nuqsOptions) - ); const [startDate, setStartDate] = useQueryState( 'start', parseAsString.withOptions(nuqsOptions) @@ -47,8 +42,15 @@ export function useOverviewOptions() { ); return { - previous, - setPrevious, + // Skip previous for ranges over 6 months (for performance reasons) + previous: !( + range === 'yearToDate' || + range === 'lastYear' || + (range === 'custom' && + startDate && + endDate && + differenceInCalendarMonths(startDate, endDate) > 6) + ), range, setRange: (value: IChartRange | null) => { if (value !== 'custom') { diff --git a/apps/dashboard/src/components/report/sidebar/ReportEvents.tsx b/apps/dashboard/src/components/report/sidebar/ReportEvents.tsx index ba8289a7..d0efe0e9 100644 --- a/apps/dashboard/src/components/report/sidebar/ReportEvents.tsx +++ b/apps/dashboard/src/components/report/sidebar/ReportEvents.tsx @@ -28,9 +28,14 @@ import type { ReportEventMoreProps } from './ReportEventMore'; export function ReportEvents() { const previous = useSelector((state) => state.report.previous); const selectedEvents = useSelector((state) => state.report.events); + const input = useSelector((state) => state.report); const dispatch = useDispatch(); const { projectId } = useAppParams(); - const eventNames = useEventNames(projectId); + const eventNames = useEventNames(projectId, { + startDate: input.startDate, + endDate: input.endDate, + range: input.range, + }); const dispatchChangeEvent = useDebounceFn((event: IChartEvent) => { dispatch(changeEvent(event)); @@ -54,7 +59,7 @@ export function ReportEvents() {
{selectedEvents.map((event) => { return ( -
+
{event.id} state.report); const getLabel = useMappings(); const dispatch = useDispatch(); const potentialValues = api.chart.values.useQuery({ event: event.name, property: filter.name, projectId, + range, + startDate, + endDate, }); const valuesCombobox = @@ -90,7 +94,7 @@ export function FilterItem({ filter, event }: FilterProps) { return (
diff --git a/apps/dashboard/src/components/report/sidebar/filters/FiltersCombobox.tsx b/apps/dashboard/src/components/report/sidebar/filters/FiltersCombobox.tsx index c8cd34ae..3479bb9d 100644 --- a/apps/dashboard/src/components/report/sidebar/filters/FiltersCombobox.tsx +++ b/apps/dashboard/src/components/report/sidebar/filters/FiltersCombobox.tsx @@ -1,6 +1,6 @@ import { Combobox } from '@/components/ui/combobox'; import { useAppParams } from '@/hooks/useAppParams'; -import { useDispatch } from '@/redux'; +import { useDispatch, useSelector } from '@/redux'; import { api } from '@/trpc/client'; import { FilterIcon } from 'lucide-react'; @@ -14,12 +14,16 @@ interface FiltersComboboxProps { export function FiltersCombobox({ event }: FiltersComboboxProps) { const dispatch = useDispatch(); + const { range, startDate, endDate } = useSelector((state) => state.report); const { projectId } = useAppParams(); const query = api.chart.properties.useQuery( { event: event.name, projectId, + range, + startDate, + endDate, }, { enabled: !!event.name, diff --git a/apps/dashboard/src/hooks/useEventNames.ts b/apps/dashboard/src/hooks/useEventNames.ts index e5c1ce12..17d908d6 100644 --- a/apps/dashboard/src/hooks/useEventNames.ts +++ b/apps/dashboard/src/hooks/useEventNames.ts @@ -1,8 +1,9 @@ import { api } from '@/trpc/client'; -export function useEventNames(projectId: string) { +export function useEventNames(projectId: string, options?: any) { const query = api.chart.events.useQuery({ projectId: projectId, + ...(options ? options : {}), }); return query.data ?? []; diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 54ee7eed..b3d6561c 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,11 +1,15 @@ import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer'; import { parseUserAgent } from '@/utils/parse-user-agent'; -import { isSameDomain, parsePath } from '@/utils/url'; import type { Job } from 'bullmq'; import { omit } from 'ramda'; import { v4 as uuid } from 'uuid'; -import { getTime, toISOString } from '@openpanel/common'; +import { + getTime, + isSameDomain, + parsePath, + toISOString, +} from '@openpanel/common'; import type { IServiceCreateEventPayload } from '@openpanel/db'; import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; @@ -97,6 +101,7 @@ export async function incomingEvent(job: Job) { referrerType: event?.referrerType ?? '', profile: undefined, meta: undefined, + importedAt: null, }; return createEvent(payload); @@ -170,8 +175,6 @@ export async function incomingEvent(job: Job) { referrer: referrer?.url, referrerName: referrer?.name || utmReferrer?.name || '', referrerType: referrer?.type || utmReferrer?.type || '', - profile: undefined, - meta: undefined, }; if (!sessionEnd) { diff --git a/packages/cli/package.json b/packages/cli/package.json index bebdb364..80e12f94 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -12,6 +12,7 @@ "typecheck": "tsc --noEmit" }, "dependencies": { + "@openpanel/common": "workspace:*", "arg": "^5.0.2", "glob": "^10.4.3", "inquirer": "^9.3.5", diff --git a/packages/cli/src/importer/copy.url.ts b/packages/cli/src/importer/copy.url.ts deleted file mode 100644 index 1de5133b..00000000 --- a/packages/cli/src/importer/copy.url.ts +++ /dev/null @@ -1,52 +0,0 @@ -export function parseSearchParams( - params: URLSearchParams -): Record | undefined { - const result: Record = {}; - for (const [key, value] of params.entries()) { - result[key] = value; - } - return Object.keys(result).length ? result : undefined; -} - -export function parsePath(path?: string): { - query?: Record; - path: string; - origin: string; - hash?: string; -} { - if (!path) { - return { - path: '', - origin: '', - }; - } - - try { - const url = new URL(path); - return { - query: parseSearchParams(url.searchParams), - path: url.pathname, - hash: url.hash || undefined, - origin: url.origin, - }; - } catch (error) { - return { - path, - origin: '', - }; - } -} - -export function isSameDomain( - url1: string | undefined, - url2: string | undefined -) { - if (!url1 || !url2) { - return false; - } - try { - return new URL(url1).hostname === new URL(url2).hostname; - } catch (e) { - return false; - } -} diff --git a/packages/cli/src/importer/importer.ts b/packages/cli/src/importer/importer.ts index 3ce52736..0fe61ae2 100644 --- a/packages/cli/src/importer/importer.ts +++ b/packages/cli/src/importer/importer.ts @@ -1,83 +1,99 @@ import { randomUUID } from 'crypto'; import fs from 'fs'; +import readline from 'readline'; import { glob } from 'glob'; -import { assocPath, clone, prop, uniqBy } from 'ramda'; +import Progress from 'progress'; +import { assocPath, prop, uniqBy } from 'ramda'; -import type { IClickhouseEvent } from '@openpanel/db'; +import { parsePath } from '@openpanel/common'; +import type { IImportedEvent } from '@openpanel/db'; -import { parsePath } from './copy.url'; +const BATCH_SIZE = 1000; +const SLEEP_TIME = 20; -const BATCH_SIZE = 8000; // Define your batch size -const SLEEP_TIME = 100; // Define your sleep time between batches - -function progress(value: string) { - process.stdout.clearLine(0); - process.stdout.cursorTo(0); - process.stdout.write(value); -} - -function stripMixpanelProperties(obj: Record) { - const properties = ['time', 'distinct_id']; - const result: Record = {}; - for (const key in obj) { - if (key.match(/^(\$|mp_)/) || properties.includes(key)) { - continue; - } - result[key] = obj[key]; - } - return result; -} - -function safeParse(json: string) { - try { - return JSON.parse(json); - } catch (error) { - return null; - } -} - -function parseFileContent(fileContent: string): { +type IMixpanelEvent = { event: string; properties: { - time: number; - distinct_id?: string | number; [key: string]: unknown; + time: number; + $current_url?: string; + distinct_id?: string; + $device_id?: string; + country_code?: string; + $region?: string; + $city?: string; + $os?: string; + $browser?: string; + $browser_version?: string; + $initial_referrer?: string; + $search_engine?: string; }; -}[] { - try { - return JSON.parse(fileContent); - } catch (error) { - const lines = fileContent.trim().split('\n'); - return lines - .map((line, index) => { - const json = safeParse(line); - if (!json) { - console.log('Warning: Failed to parse JSON'); - console.log('Index:', index); - console.log('Line:', line); - } - return json; - }) - .filter(Boolean); +}; + +function stripMixpanelProperties(obj: Record) { + return Object.fromEntries( + Object.entries(obj).filter( + ([key]) => + !key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key) + ) + ); +} + +async function* parseJsonStream( + fileStream: fs.ReadStream +): AsyncGenerator { + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let buffer = ''; + let bracketCount = 0; + + for await (const line of rl) { + buffer += line; + bracketCount += + (line.match(/{/g) || []).length - (line.match(/}/g) || []).length; + + if (bracketCount === 0 && buffer.trim()) { + try { + const json = JSON.parse(buffer); + yield json; + } catch (error) { + console.log('Warning: Failed to parse JSON'); + console.log('Buffer:', buffer); + } + buffer = ''; + } + } + + if (buffer.trim()) { + try { + const json = JSON.parse(buffer); + yield json; + } catch (error) { + console.log('Warning: Failed to parse remaining JSON'); + console.log('Buffer:', buffer); + } } } interface Session { - start: number; // Timestamp of the session start - end: number; // Timestamp of the session end + start: number; + end: number; profileId?: string; deviceId?: string; sessionId: string; - firstEvent?: IClickhouseEvent; - lastEvent?: IClickhouseEvent; - events: IClickhouseEvent[]; + firstEvent?: IImportedEvent; + lastEvent?: IImportedEvent; + events: IImportedEvent[]; } -function generateSessionEvents(events: IClickhouseEvent[]): Session[] { +function generateSessionEvents(events: IImportedEvent[]): Session[] { let sessionList: Session[] = []; const lastSessionByDevice: Record = {}; const lastSessionByProfile: Record = {}; - const thirtyMinutes = 30 * 60 * 1000; // 30 minutes in milliseconds + const thirtyMinutes = 30 * 60 * 1000; events.sort( (a, b) => @@ -93,7 +109,6 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { ? lastSessionByProfile[event.profile_id] : undefined; - // Check if new session is needed for deviceId if ( event.device_id && event.device_id !== event.profile_id && @@ -103,7 +118,7 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { start: eventTime, end: eventTime, deviceId: event.device_id, - sessionId: generateSessionId(), + sessionId: randomUUID(), firstEvent: event, events: [event], }; @@ -115,7 +130,6 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { deviceSession.events.push(event); } - // Check if new session is needed for profileId if ( event.profile_id && event.device_id !== event.profile_id && @@ -125,7 +139,7 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { start: eventTime, end: eventTime, profileId: event.profile_id, - sessionId: generateSessionId(), + sessionId: randomUUID(), firstEvent: event, events: [event], }; @@ -137,32 +151,21 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { profileSession.events.push(event); } - // Sync device and profile sessions if both exist - // if ( - // deviceSession && - // profileSession && - // deviceSession.sessionId !== profileSession.sessionId - // ) { - // profileSession.sessionId = deviceSession.sessionId; - // } - if ( deviceSession && profileSession && deviceSession.sessionId !== profileSession.sessionId ) { - // Merge sessions by ensuring they reference the same object const unifiedSession = { ...deviceSession, ...profileSession, events: [...deviceSession.events, ...profileSession.events], start: Math.min(deviceSession.start, profileSession.start), end: Math.max(deviceSession.end, profileSession.end), - sessionId: deviceSession.sessionId, // Prefer the deviceSession ID for no particular reason + sessionId: deviceSession.sessionId, }; lastSessionByDevice[event.device_id] = unifiedSession; lastSessionByProfile[event.profile_id] = unifiedSession; - // filter previous before appending new unified fileter sessionList = sessionList.filter( (session) => session.sessionId !== deviceSession?.sessionId && @@ -175,99 +178,88 @@ function generateSessionEvents(events: IClickhouseEvent[]): Session[] { return sessionList; } -function generateSessionId(): string { - return randomUUID(); +function createEventObject(event: IMixpanelEvent): IImportedEvent { + const url = parsePath(event.properties.$current_url); + return { + profile_id: event.properties.distinct_id + ? String(event.properties.distinct_id).replace(/^\$device:/, '') + : event.properties.$device_id ?? '', + name: event.event, + created_at: new Date(event.properties.time * 1000).toISOString(), + properties: { + ...stripMixpanelProperties(event.properties), + ...(event.properties.$current_url + ? { + __query: url.query, + __hash: url.hash, + } + : {}), + }, + country: event.properties.country_code ?? '', + region: event.properties.$region ?? '', + city: event.properties.$city ?? '', + os: event.properties.$os ?? '', + browser: event.properties.$browser ?? '', + browser_version: event.properties.$browser_version + ? String(event.properties.$browser_version) + : '', + referrer: event.properties.$initial_referrer ?? '', + referrer_type: event.properties.$search_engine ? 'search' : '', + referrer_name: event.properties.$search_engine ?? '', + device_id: event.properties.$device_id ?? '', + session_id: '', + project_id: '', + path: url.path, + origin: url.origin, + os_version: '', + model: '', + longitude: null, + latitude: null, + id: randomUUID(), + duration: 0, + device: event.properties.$current_url ? '' : 'server', + brand: '', + }; } -async function loadFiles(files: string[] = []) { - const data: any[] = []; - const filesToParse = files.slice(0, 10); +function isMixpanelEvent(event: any): event is IMixpanelEvent { + return ( + typeof event === 'object' && + event !== null && + typeof event?.event === 'string' && + typeof event?.properties === 'object' && + event?.properties !== null && + typeof event?.properties.time === 'number' + ); +} - await new Promise((resolve) => { - filesToParse.forEach((file) => { - const readStream = fs.createReadStream(file); - const content: any[] = []; - - readStream.on('data', (chunk) => { - content.push(chunk.toString('utf-8')); - }); - - readStream.on('end', () => { - console.log('Finished reading file:', file); - data.push(parseFileContent(content.join(''))); - if (data.length === filesToParse.length) { - resolve(1); +async function processFile(file: string): Promise { + const fileStream = fs.createReadStream(file); + const events: IImportedEvent[] = []; + for await (const event of parseJsonStream(fileStream)) { + if (Array.isArray(event)) { + for (const item of event) { + if (isMixpanelEvent(item)) { + events.push(createEventObject(item)); + } else { + console.log('Not a Mixpanel event', item); } - }); - - readStream.on('error', (error) => { - console.error('Error reading file:', error); - }); - }); - }); - - // sorted oldest to latest - const a = data - .flat() - .sort((a, b) => a.properties.time - b.properties.time) - .map((event) => { - const currentUrl = event.properties.$current_url; - if (currentUrl) { - // console.log(''); - // console.log(event.properties); - // console.log(''); } - const url = parsePath(currentUrl); - const eventToSave = { - profile_id: event.properties.distinct_id - ? String(event.properties.distinct_id).replace(/^\$device:/, '') - : event.properties.$device_id ?? '', - name: event.event, - created_at: new Date(event.properties.time * 1000).toISOString(), - properties: { - ...(stripMixpanelProperties(event.properties) as Record< - string, - string - >), - ...(currentUrl - ? { - __query: url.query, - __hash: url.hash, - } - : {}), - }, - country: event.properties.country_code ?? '', - region: event.properties.$region ?? '', - city: event.properties.$city ?? '', - os: event.properties.$os ?? '', - browser: event.properties.$browser ?? '', - browser_version: event.properties.$browser_version - ? String(event.properties.$browser_version) - : '', - referrer: event.properties.$initial_referrer ?? '', - referrer_type: event.properties.$search_engine ? 'search' : '', // FIX (IN API) - referrer_name: event.properties.$search_engine ?? '', // FIX (IN API) - device_id: event.properties.$device_id ?? '', - session_id: '', - project_id: '', // FIX (IN API) - path: url.path, - origin: url.origin, - os_version: '', // FIX - model: '', - longitude: null, - latitude: null, - id: randomUUID(), - duration: 0, - device: currentUrl ? '' : 'server', - brand: '', - }; - return eventToSave; - }); + } else { + if (isMixpanelEvent(event)) { + events.push(createEventObject(event)); + } else { + console.log('Not a Mixpanel event', event); + } + } + } + return events; +} - const sessions = generateSessionEvents(a); - - const events = sessions.flatMap((session) => { - return [ +function processEvents(events: IImportedEvent[]): IImportedEvent[] { + const sessions = generateSessionEvents(events); + const processedEvents = sessions.flatMap((session) => + [ session.firstEvent && { ...session.firstEvent, id: randomUUID(), @@ -280,7 +272,7 @@ async function loadFiles(files: string[] = []) { ...uniqBy( prop('id'), session.events.map((event) => - assocPath(['session_id'], session.sessionId, clone(event)) + assocPath(['session_id'], session.sessionId, event) ) ), session.lastEvent && { @@ -292,24 +284,20 @@ async function loadFiles(files: string[] = []) { session_id: session.sessionId, name: 'session_end', }, - ].filter(Boolean); - }); + ].filter((item): item is IImportedEvent => !!item) + ); - const totalPages = Math.ceil(events.length / BATCH_SIZE); - const estimatedTime = (totalPages / 8) * SLEEP_TIME + (totalPages / 8) * 80; - console.log(`Estimated time: ${estimatedTime / 1000} seconds`); + return [ + ...processedEvents, + ...events.filter((event) => { + return !event.profile_id && !event.device_id; + }), + ]; +} - async function batcher(page: number) { - const batch = events.slice(page * BATCH_SIZE, (page + 1) * BATCH_SIZE); - - if (batch.length === 0) { - return; - } - - // const size = Buffer.byteLength(JSON.stringify(batch)); - // console.log(batch.length, size / (1024 * 1024)); - - await fetch('http://localhost:3333/import/events', { +async function sendBatchToAPI(batch: IImportedEvent[]) { + try { + const res = await fetch('http://localhost:3333/import/events', { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -318,72 +306,78 @@ async function loadFiles(files: string[] = []) { }, body: JSON.stringify(batch), }); - await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME)); + } catch (e) { + console.log('sendBatchToAPI failed'); + throw e; } +} - async function runBatchesInParallel( - totalPages: number, - concurrentBatches: number - ) { - let currentPage = 0; +async function processFiles(files: string[]) { + const progress = new Progress( + 'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status', + { + total: files.length, + width: 20, + } + ); + let savedEvents = 0; + let currentBatch: IImportedEvent[] = []; + let apiBatching = []; - while (currentPage < totalPages) { - const promises = []; - for ( - let i = 0; - i < concurrentBatches && currentPage < totalPages; - i++, currentPage++ - ) { - progress( - `Sending batch ${currentPage} (${Math.round((currentPage / totalPages) * 100)}... %)` - ); - promises.push(batcher(currentPage)); + for (const file of files) { + progress.tick({ + file, + savedEvents, + status: 'reading file', + }); + const events = await processFile(file); + progress.render({ + file, + savedEvents, + status: 'processing events', + }); + const processedEvents = processEvents(events); + for (const event of processedEvents) { + currentBatch.push(event); + if (currentBatch.length >= BATCH_SIZE) { + apiBatching.push(currentBatch); + savedEvents += currentBatch.length; + progress.render({ file, savedEvents, status: 'saving events' }); + currentBatch = []; + } + + if (apiBatching.length >= 10) { + await Promise.all(apiBatching.map(sendBatchToAPI)); + apiBatching = []; } - await Promise.all(promises); } } - // Trigger the batches - try { - await runBatchesInParallel(totalPages, 8); // Run 8 batches in parallel - } catch (e) { - console.log('ERROR?!', e); + if (currentBatch.length > 0) { + await sendBatchToAPI(currentBatch); + savedEvents += currentBatch.length; + progress.render({ file: 'Complete', savedEvents, status: 'Complete' }); } } export async function importFiles(matcher: string) { - const files = await glob([matcher], { - root: '/', - }); + const files = await glob([matcher], { root: '/' }); if (files.length === 0) { console.log('No files found'); return; } - function chunks(array: string[], size: number) { - const results = []; - while (array.length) { - results.push(array.splice(0, size)); - } - return results; - } + files.sort((a, b) => a.localeCompare(b)); - const times = []; - const chunksArray = chunks(files, 3); - let chunkIndex = 0; - for (const chunk of chunksArray) { - if (times.length > 0) { - // Print out how much time is approximately left - const average = times.reduce((a, b) => a + b) / times.length; - const remaining = (chunksArray.length - chunkIndex) * average; - console.log(`\n\nEstimated time left: ${remaining / 1000 / 60} minutes`); - } - console.log('Processing chunk:', chunkIndex); - chunkIndex++; - const d = Date.now(); - await loadFiles(chunk); - times.push(Date.now() - d); - } + console.log(`Found ${files.length} files to process`); + + const startTime = Date.now(); + await processFiles(files); + const endTime = Date.now(); + + console.log( + `\nProcessing completed in ${(endTime - startTime) / 1000} seconds` + ); } diff --git a/packages/cli/src/importer/importer_v2.ts b/packages/cli/src/importer/importer_v2.ts deleted file mode 100644 index e9dc05cb..00000000 --- a/packages/cli/src/importer/importer_v2.ts +++ /dev/null @@ -1,339 +0,0 @@ -import { randomUUID } from 'crypto'; -import fs from 'fs'; -import readline from 'readline'; -import { glob } from 'glob'; -import Progress from 'progress'; -import { assocPath, prop, uniqBy } from 'ramda'; - -import type { IClickhouseEvent } from '@openpanel/db'; - -import { parsePath } from './copy.url'; - -const BATCH_SIZE = 8000; -const SLEEP_TIME = 100; - -function stripMixpanelProperties(obj: Record) { - return Object.fromEntries( - Object.entries(obj).filter( - ([key]) => - !key.match(/^(\$|mp_)/) && !['time', 'distinct_id'].includes(key) - ) - ); -} - -async function* parseJsonStream( - fileStream: fs.ReadStream -): AsyncGenerator { - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - let buffer = ''; - let bracketCount = 0; - - for await (const line of rl) { - buffer += line; - bracketCount += - (line.match(/{/g) || []).length - (line.match(/}/g) || []).length; - - if (bracketCount === 0 && buffer.trim()) { - try { - const json = JSON.parse(buffer); - yield json; - } catch (error) { - console.log('Warning: Failed to parse JSON'); - console.log('Buffer:', buffer); - } - buffer = ''; - } - } - - if (buffer.trim()) { - try { - const json = JSON.parse(buffer); - yield json; - } catch (error) { - console.log('Warning: Failed to parse remaining JSON'); - console.log('Buffer:', buffer); - } - } -} - -interface Session { - start: number; - end: number; - profileId?: string; - deviceId?: string; - sessionId: string; - firstEvent?: IClickhouseEvent; - lastEvent?: IClickhouseEvent; - events: IClickhouseEvent[]; -} - -function generateSessionEvents(events: IClickhouseEvent[]): Session[] { - let sessionList: Session[] = []; - const lastSessionByDevice: Record = {}; - const lastSessionByProfile: Record = {}; - const thirtyMinutes = 30 * 60 * 1000; - - events.sort( - (a, b) => - new Date(a.created_at).getTime() - new Date(b.created_at).getTime() - ); - - for (const event of events) { - const eventTime = new Date(event.created_at).getTime(); - let deviceSession = event.device_id - ? lastSessionByDevice[event.device_id] - : undefined; - let profileSession = event.profile_id - ? lastSessionByProfile[event.profile_id] - : undefined; - - if ( - event.device_id && - event.device_id !== event.profile_id && - (!deviceSession || eventTime > deviceSession.end + thirtyMinutes) - ) { - deviceSession = { - start: eventTime, - end: eventTime, - deviceId: event.device_id, - sessionId: randomUUID(), - firstEvent: event, - events: [event], - }; - lastSessionByDevice[event.device_id] = deviceSession; - sessionList.push(deviceSession); - } else if (deviceSession) { - deviceSession.end = eventTime; - deviceSession.lastEvent = event; - deviceSession.events.push(event); - } - - if ( - event.profile_id && - event.device_id !== event.profile_id && - (!profileSession || eventTime > profileSession.end + thirtyMinutes) - ) { - profileSession = { - start: eventTime, - end: eventTime, - profileId: event.profile_id, - sessionId: randomUUID(), - firstEvent: event, - events: [event], - }; - lastSessionByProfile[event.profile_id] = profileSession; - sessionList.push(profileSession); - } else if (profileSession) { - profileSession.end = eventTime; - profileSession.lastEvent = event; - profileSession.events.push(event); - } - - if ( - deviceSession && - profileSession && - deviceSession.sessionId !== profileSession.sessionId - ) { - const unifiedSession = { - ...deviceSession, - ...profileSession, - events: [...deviceSession.events, ...profileSession.events], - start: Math.min(deviceSession.start, profileSession.start), - end: Math.max(deviceSession.end, profileSession.end), - sessionId: deviceSession.sessionId, - }; - lastSessionByDevice[event.device_id] = unifiedSession; - lastSessionByProfile[event.profile_id] = unifiedSession; - sessionList = sessionList.filter( - (session) => - session.sessionId !== deviceSession?.sessionId && - session.sessionId !== profileSession?.sessionId - ); - sessionList.push(unifiedSession); - } - } - - return sessionList; -} - -function createEventObject(event: any): IClickhouseEvent { - const url = parsePath(event.properties.$current_url); - return { - profile_id: event.properties.distinct_id - ? String(event.properties.distinct_id).replace(/^\$device:/, '') - : event.properties.$device_id ?? '', - name: event.event, - created_at: new Date(event.properties.time * 1000).toISOString(), - properties: { - ...stripMixpanelProperties(event.properties), - ...(event.properties.$current_url - ? { - __query: url.query, - __hash: url.hash, - } - : {}), - }, - country: event.properties.country_code ?? '', - region: event.properties.$region ?? '', - city: event.properties.$city ?? '', - os: event.properties.$os ?? '', - browser: event.properties.$browser ?? '', - browser_version: event.properties.$browser_version - ? String(event.properties.$browser_version) - : '', - referrer: event.properties.$initial_referrer ?? '', - referrer_type: event.properties.$search_engine ? 'search' : '', - referrer_name: event.properties.$search_engine ?? '', - device_id: event.properties.$device_id ?? '', - session_id: '', - project_id: '', - path: url.path, - origin: url.origin, - os_version: '', - model: '', - longitude: null, - latitude: null, - id: randomUUID(), - duration: 0, - device: event.properties.$current_url ? '' : 'server', - brand: '', - }; -} - -async function processFile(file: string): Promise { - const fileStream = fs.createReadStream(file); - const events: IClickhouseEvent[] = []; - for await (const event of parseJsonStream(fileStream)) { - if (Array.isArray(event)) { - for (const item of event) { - events.push(createEventObject(item)); - } - } else { - events.push(createEventObject(event)); - } - } - return events; -} - -function processEvents(events: IClickhouseEvent[]): IClickhouseEvent[] { - const sessions = generateSessionEvents(events); - const processedEvents = sessions.flatMap((session) => - [ - session.firstEvent && { - ...session.firstEvent, - id: randomUUID(), - created_at: new Date( - new Date(session.firstEvent.created_at).getTime() - 1000 - ).toISOString(), - session_id: session.sessionId, - name: 'session_start', - }, - ...uniqBy( - prop('id'), - session.events.map((event) => - assocPath(['session_id'], session.sessionId, event) - ) - ), - session.lastEvent && { - ...session.lastEvent, - id: randomUUID(), - created_at: new Date( - new Date(session.lastEvent.created_at).getTime() + 1000 - ).toISOString(), - session_id: session.sessionId, - name: 'session_end', - }, - ].filter((item): item is IClickhouseEvent => !!item) - ); - - return [ - ...processedEvents, - ...events.filter((event) => { - return !event.profile_id && !event.device_id; - }), - ]; -} - -async function sendBatchToAPI(batch: IClickhouseEvent[]) { - await fetch('http://localhost:3333/import/events', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'openpanel-client-id': 'dd3db204-dcf6-49e2-9e82-de01cba7e585', - 'openpanel-client-secret': 'sec_293b903816e327e10c9d', - }, - body: JSON.stringify(batch), - }); - await new Promise((resolve) => setTimeout(resolve, SLEEP_TIME)); -} - -async function processFiles(files: string[]) { - const progress = new Progress( - 'Processing (:current/:total) :file [:bar] :percent | :savedEvents saved events | :status', - { - total: files.length, - width: 20, - } - ); - let savedEvents = 0; - let currentBatch: IClickhouseEvent[] = []; - - let apiBatching = []; - for (const file of files) { - progress.tick({ - file, - savedEvents, - status: 'reading file', - }); - const events = await processFile(file); - progress.render({ - file, - savedEvents, - status: 'processing events', - }); - const processedEvents = processEvents(events); - for (const event of processedEvents) { - currentBatch.push(event); - if (currentBatch.length >= BATCH_SIZE) { - apiBatching.push(currentBatch); - savedEvents += currentBatch.length; - progress.render({ file, savedEvents, status: 'saving events' }); - currentBatch = []; - } - - if (apiBatching.length >= 10) { - await Promise.all(apiBatching.map(sendBatchToAPI)); - apiBatching = []; - } - } - } - - if (currentBatch.length > 0) { - await sendBatchToAPI(currentBatch); - savedEvents += currentBatch.length; - progress.render({ file: 'Complete', savedEvents, status: 'Complete' }); - } -} - -export async function importFiles(matcher: string) { - const files = await glob([matcher], { root: '/' }); - - if (files.length === 0) { - console.log('No files found'); - return; - } - - console.log(`Found ${files.length} files to process`); - - const startTime = Date.now(); - await processFiles(files); - const endTime = Date.now(); - - console.log( - `\nProcessing completed in ${(endTime - startTime) / 1000} seconds` - ); -} diff --git a/packages/cli/src/importer/index.ts b/packages/cli/src/importer/index.ts index 9081bdfd..ea1ef9ac 100644 --- a/packages/cli/src/importer/index.ts +++ b/packages/cli/src/importer/index.ts @@ -1,66 +1,7 @@ -import fs from 'fs'; import path from 'path'; import arg from 'arg'; -import { groupBy } from 'ramda'; -import type { PostEventPayload } from '@openpanel/sdk'; - -import { importFiles } from './importer_v2'; - -const BATCH_SIZE = 10000; // Define your batch size -const SLEEP_TIME = 100; // Define your sleep time between batches - -function progress(value: string) { - process.stdout.clearLine(0); - process.stdout.cursorTo(0); - process.stdout.write(value); -} - -function stripMixpanelProperties(obj: Record) { - const properties = ['time', 'distinct_id']; - const result: Record = {}; - for (const key in obj) { - if (key.match(/^(\$|mp_)/) || properties.includes(key)) { - continue; - } - result[key] = obj[key]; - } - return result; -} - -function safeParse(json: string) { - try { - return JSON.parse(json); - } catch (error) { - return null; - } -} - -function parseFileContent(fileContent: string): { - event: string; - properties: { - time: number; - distinct_id?: string | number; - [key: string]: unknown; - }; -}[] { - try { - return JSON.parse(fileContent); - } catch (error) { - const lines = fileContent.trim().split('\n'); - return lines - .map((line, index) => { - const json = safeParse(line); - if (!json) { - console.log('Warning: Failed to parse JSON'); - console.log('Index:', index); - console.log('Line:', line); - } - return json; - }) - .filter(Boolean); - } -} +import { importFiles } from './importer'; export default function importer() { const args = arg( diff --git a/packages/common/index.ts b/packages/common/index.ts index b30cb315..7eda2c40 100644 --- a/packages/common/index.ts +++ b/packages/common/index.ts @@ -7,3 +7,4 @@ export * from './src/string'; export * from './src/math'; export * from './src/slug'; export * from './src/fill-series'; +export * from './src/url'; diff --git a/apps/worker/src/utils/url.ts b/packages/common/src/url.ts similarity index 100% rename from apps/worker/src/utils/url.ts rename to packages/common/src/url.ts diff --git a/packages/db/clickhouse_init.sql b/packages/db/clickhouse_init.sql index 4339d4a2..80075ee8 100644 --- a/packages/db/clickhouse_init.sql +++ b/packages/db/clickhouse_init.sql @@ -1,39 +1,5 @@ CREATE DATABASE IF NOT EXISTS openpanel; -CREATE TABLE IF NOT EXISTS openpanel.events ( - `id` UUID DEFAULT generateUUIDv4(), - `name` String, - `device_id` String, - `profile_id` String, - `project_id` String, - `session_id` String, - `path` String, - `origin` String, - `referrer` String, - `referrer_name` String, - `referrer_type` String, - `duration` UInt64, - `properties` Map(String, String), - `created_at` DateTime64(3), - `country` String, - `city` String, - `region` String, - `longitude` Nullable(Float32), - `latitude` Nullable(Float32), - `os` String, - `os_version` String, - `browser` String, - `browser_version` String, - -- device: mobile/desktop/tablet - `device` String, - -- brand: (Samsung, OnePlus) - `brand` String, - -- model: (Samsung Galaxy, iPhone X) - `model` String -) ENGINE MergeTree -ORDER BY - (project_id, created_at, profile_id) SETTINGS index_granularity = 8192; - CREATE TABLE IF NOT EXISTS openpanel.events_v2 ( `id` UUID DEFAULT generateUUIDv4(), `name` String, @@ -58,26 +24,17 @@ CREATE TABLE IF NOT EXISTS openpanel.events_v2 ( `os_version` String, `browser` String, `browser_version` String, - -- device: mobile/desktop/tablet `device` String, - -- brand: (Samsung, OnePlus) `brand` String, - -- model: (Samsung Galaxy, iPhone X) - `model` String -) ENGINE = MergeTree() PARTITION BY toYYYYMM(created_at) + `model` String, + `imported_at` Nullable(DateTime), + INDEX idx_name name TYPE bloom_filter GRANULARITY 1, + INDEX idx_properties_bounce properties ['__bounce'] TYPE + set + (3) GRANULARITY 1 +) ENGINE = MergeTree PARTITION BY toYYYYMM(created_at) ORDER BY - (project_id, created_at, profile_id) SETTINGS index_granularity = 8192; - -ALTER TABLE - events DROP COLUMN utm_source, - DROP COLUMN utm_medium, - DROP COLUMN utm_campaign, - DROP COLUMN utm_term, - DROP COLUMN utm_content, - DROP COLUMN sdk, - DROP COLUMN sdk_version, - DROP COLUMN client_type, - DROP COLUMN continent; + (project_id, toDate(created_at), profile_id, name) SETTINGS index_granularity = 8192; CREATE TABLE IF NOT EXISTS openpanel.events_bots ( `id` UUID DEFAULT generateUUIDv4(), diff --git a/packages/db/clickhouse_tables.sql b/packages/db/clickhouse_tables.sql deleted file mode 100644 index 1066bf7d..00000000 --- a/packages/db/clickhouse_tables.sql +++ /dev/null @@ -1,123 +0,0 @@ -CREATE TABLE openpanel.events ( - `id` UUID DEFAULT generateUUIDv4(), - `name` String, - `device_id` String, - `profile_id` String, - `project_id` String, - `session_id` String, - `path` String, - `origin` String, - `referrer` String, - `referrer_name` String, - `referrer_type` String, - `duration` UInt64, - `properties` Map(String, String), - `created_at` DateTime64(3), - `country` String, - `city` String, - `region` String, - `longitude` Int16, - `latitude` Int16, - `os` String, - `os_version` String, - `browser` String, - `browser_version` String, - -- device: mobile/desktop/tablet - `device` String, - -- brand: (Samsung, OnePlus) - `brand` String, - -- model: (Samsung Galaxy, iPhone X) - `model` String -) ENGINE MergeTree -ORDER BY - (project_id, created_at, profile_id) SETTINGS index_granularity = 8192; - -CREATE TABLE openpanel.events_bots ( - `id` UUID DEFAULT generateUUIDv4(), - `project_id` String, - `name` String, - `type` String, - `path` String, - `created_at` DateTime64(3), -) ENGINE MergeTree -ORDER BY - (project_id, created_at) SETTINGS index_granularity = 8192; - -CREATE TABLE openpanel.profiles ( - `id` String, - `first_name` String, - `last_name` String, - `email` String, - `avatar` String, - `properties` Map(String, String), - `project_id` String, - `created_at` DateTime -) ENGINE = ReplacingMergeTree(created_at) -ORDER BY - (id) SETTINGS index_granularity = 8192; - -ALTER TABLE - events -ADD - COLUMN origin String -AFTER - path; - -ALTER TABLE - events DROP COLUMN id; - -CREATE TABLE ba ( - `id` UUID DEFAULT generateUUIDv4(), - `a` String, - `b` String -) ENGINE MergeTree -ORDER BY - (a, b) SETTINGS index_granularity = 8192; - -ALTER TABLE - events_bots -ADD - COLUMN id UUID DEFAULT generateUUIDv4() FIRST; - -ALTER TABLE - events -ADD - COLUMN longitude Nullable(Float32); - -ALTER TABLE - events -ADD - COLUMN latitude Nullable(Float32); - ---- Materialized views (DAU) -CREATE MATERIALIZED VIEW dau_mv ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMMDD(date) -ORDER BY - (project_id, date) POPULATE AS -SELECT - toDate(created_at) as date, - uniqState(profile_id) as profile_id, - project_id -FROM - events -GROUP BY - date, - project_id; - --- DROP external_id and add is_external column -ALTER TABLE - profiles DROP COLUMN external_id; - -ALTER TABLE - profiles -ADD - COLUMN is_external Boolean -AFTER - id; - -ALTER TABLE - profiles -UPDATE - is_external = length(id) != 32 -WHERE - true - and length(id) != 32; \ No newline at end of file diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 4faa526a..b01656dc 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -11,8 +11,8 @@ export const originalCh = createClient({ username: process.env.CLICKHOUSE_USER, password: process.env.CLICKHOUSE_PASSWORD, database: process.env.CLICKHOUSE_DB, - max_open_connections: 10, - request_timeout: 10000, + max_open_connections: 30, + request_timeout: 30000, keep_alive: { enabled: true, idle_socket_ttl: 8000, @@ -92,7 +92,7 @@ export async function chQueryWithMeta>( }; console.log( - `Query: (${Date.now() - start}ms, ${response.statistics?.elapsed}ms)`, + `Query: (${Date.now() - start}ms, ${response.statistics?.elapsed}ms), Rows: ${json.rows}`, query ); diff --git a/packages/db/src/services/chart.service.ts b/packages/db/src/services/chart.service.ts index e84a2e85..a2a9ebf4 100644 --- a/packages/db/src/services/chart.service.ts +++ b/packages/db/src/services/chart.service.ts @@ -125,33 +125,67 @@ export function getEventFiltersWhereClause(filters: IChartEventFilter[]) { } if (name.startsWith('properties.')) { + const propertyKey = name + .replace(/^properties\./, '') + .replace('.*.', '.%.'); + const isWildcard = propertyKey.includes('%'); const whereFrom = `arrayMap(x -> trim(x), mapValues(mapExtractKeyLike(properties, ${escape( name.replace(/^properties\./, '').replace('.*.', '.%.') )})))`; switch (operator) { case 'is': { - where[id] = `arrayExists(x -> ${value - .map((val) => `x = ${escape(String(val).trim())}`) - .join(' OR ')}, ${whereFrom})`; + if (isWildcard) { + where[id] = `arrayExists(x -> ${value + .map((val) => `x = ${escape(String(val).trim())}`) + .join(' OR ')}, ${whereFrom})`; + } else { + where[id] = `properties['${propertyKey}'] IN (${value + .map((val) => escape(String(val).trim())) + .join(', ')})`; + } break; } case 'isNot': { - where[id] = `arrayExists(x -> ${value - .map((val) => `x != ${escape(String(val).trim())}`) - .join(' OR ')}, ${whereFrom})`; + if (isWildcard) { + where[id] = `arrayExists(x -> ${value + .map((val) => `x != ${escape(String(val).trim())}`) + .join(' OR ')}, ${whereFrom})`; + } else { + where[id] = `properties['${propertyKey}'] NOT IN (${value + .map((val) => escape(String(val).trim())) + .join(', ')})`; + } break; } case 'contains': { - where[id] = `arrayExists(x -> ${value - .map((val) => `x LIKE ${escape(`%${String(val).trim()}%`)}`) - .join(' OR ')}, ${whereFrom})`; + if (isWildcard) { + where[id] = `arrayExists(x -> ${value + .map((val) => `x LIKE ${escape(`%${String(val).trim()}%`)}`) + .join(' OR ')}, ${whereFrom})`; + } else { + where[id] = value + .map( + (val) => + `properties['${propertyKey}'] LIKE ${escape(`%${String(val).trim()}%`)}` + ) + .join(' OR '); + } break; } case 'doesNotContain': { - where[id] = `arrayExists(x -> ${value - .map((val) => `x NOT LIKE ${escape(`%${String(val).trim()}%`)}`) - .join(' OR ')}, ${whereFrom})`; + if (isWildcard) { + where[id] = `arrayExists(x -> ${value + .map((val) => `x NOT LIKE ${escape(`%${String(val).trim()}%`)}`) + .join(' OR ')}, ${whereFrom})`; + } else { + where[id] = value + .map( + (val) => + `properties['${propertyKey}'] NOT LIKE ${escape(`%${String(val).trim()}%`)}` + ) + .join(' OR '); + } break; } } diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 19972c4c..eff8c437 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -21,6 +21,13 @@ import { getEventFiltersWhereClause } from './chart.service'; import { getProfiles, upsertProfile } from './profile.service'; import type { IServiceProfile } from './profile.service'; +export type IImportedEvent = Omit< + IClickhouseEvent, + 'properties' | 'profile' | 'meta' | 'imported_at' +> & { + properties: Record; +}; + export interface IClickhouseEvent { id: string; name: string; @@ -34,7 +41,7 @@ export interface IClickhouseEvent { referrer_name: string; referrer_type: string; duration: number; - properties: Record; + properties: Record; created_at: string; country: string; city: string; @@ -48,6 +55,7 @@ export interface IClickhouseEvent { device: string; brand: string; model: string; + imported_at: string | null; // They do not exist here. Just make ts happy for now profile?: IServiceProfile; @@ -86,6 +94,7 @@ export function transformEvent( referrerType: event.referrer_type, profile: event.profile, meta: event.meta, + importedAt: event.imported_at ? new Date(event.imported_at) : null, }; } @@ -119,6 +128,7 @@ export interface IServiceCreateEventPayload { referrer: string | undefined; referrerName: string | undefined; referrerType: string | undefined; + importedAt: Date | null; profile: IServiceProfile | undefined; meta: EventMeta | undefined; } @@ -221,7 +231,10 @@ export async function getEvents( } export async function createEvent( - payload: Omit + payload: Omit< + IServiceCreateEventPayload, + 'id' | 'importedAt' | 'profile' | 'meta' + > ) { if (!payload.profileId) { payload.profileId = payload.deviceId; @@ -283,6 +296,7 @@ export async function createEvent( referrer: payload.referrer ?? '', referrer_name: payload.referrerName ?? '', referrer_type: payload.referrerType ?? '', + imported_at: null, }; await eventBuffer.insert(event); diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index 425375a3..74f2d6e9 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -64,17 +64,16 @@ interface GetProfileListOptions { } export async function getProfiles(ids: string[]) { - if (ids.length === 0) { + const filteredIds = ids.filter((id) => id !== ''); + + if (filteredIds.length === 0) { return []; } const data = await chQuery( `SELECT * FROM profiles FINAL - WHERE id IN (${ids - .map((id) => escape(id)) - .filter(Boolean) - .join(',')}) + WHERE id IN (${filteredIds.map((id) => escape(id)).join(',')}) ` ); diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index 2b26ddbb..4b3a3a9b 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -1,10 +1,17 @@ +import { subMonths } from 'date-fns'; import { flatten, map, pipe, prop, sort, uniq } from 'ramda'; import { escape } from 'sqlstring'; import { z } from 'zod'; import { average, max, min, round, slug, sum } from '@openpanel/common'; -import { chQuery, createSqlBuilder, db, TABLE_NAMES } from '@openpanel/db'; -import { zChartInput } from '@openpanel/validation'; +import { + chQuery, + createSqlBuilder, + db, + formatClickhouseDate, + TABLE_NAMES, +} from '@openpanel/db'; +import { zChartInput, zRange } from '@openpanel/validation'; import type { FinalChart, IChartInput, @@ -24,10 +31,18 @@ import { export const chartRouter = createTRPCRouter({ events: protectedProcedure - .input(z.object({ projectId: z.string() })) - .query(async ({ input: { projectId } }) => { + .input( + z.object({ + projectId: z.string(), + range: zRange.default('30d'), + startDate: z.string().nullish(), + endDate: z.string().nullish(), + }) + ) + .query(async ({ input: { projectId, ...input } }) => { + const { startDate, endDate } = getChartStartEndDate(input); const events = await chQuery<{ name: string }>( - `SELECT DISTINCT name FROM ${TABLE_NAMES.events} WHERE project_id = ${escape(projectId)}` + `SELECT DISTINCT name FROM ${TABLE_NAMES.events} WHERE project_id = ${escape(projectId)} AND created_at BETWEEN toDate('${formatClickhouseDate(startDate)}') AND toDate('${formatClickhouseDate(endDate)}');` ); return [ @@ -39,12 +54,22 @@ export const chartRouter = createTRPCRouter({ }), properties: protectedProcedure - .input(z.object({ event: z.string().optional(), projectId: z.string() })) - .query(async ({ input: { projectId, event } }) => { + .input( + z.object({ + event: z.string().optional(), + projectId: z.string(), + range: zRange.default('30d'), + startDate: z.string().nullish(), + endDate: z.string().nullish(), + }) + ) + .query(async ({ input: { projectId, event, ...input } }) => { + const { startDate, endDate } = getChartStartEndDate(input); const events = await chQuery<{ keys: string[] }>( `SELECT distinct mapKeys(properties) as keys from ${TABLE_NAMES.events} where ${ event && event !== '*' ? `name = ${escape(event)} AND ` : '' - } project_id = ${escape(projectId)};` + } project_id = ${escape(projectId)} AND + created_at BETWEEN toDate('${formatClickhouseDate(startDate)}') AND toDate('${formatClickhouseDate(endDate)}');` ); const properties = events @@ -86,9 +111,13 @@ export const chartRouter = createTRPCRouter({ event: z.string(), property: z.string(), projectId: z.string(), + range: zRange.default('30d'), + startDate: z.string().nullish(), + endDate: z.string().nullish(), }) ) - .query(async ({ input: { event, property, projectId } }) => { + .query(async ({ input: { event, property, projectId, ...input } }) => { + const { startDate, endDate } = getChartStartEndDate(input); if (property === 'has_profile') { return { values: ['true', 'false'], @@ -108,6 +137,8 @@ export const chartRouter = createTRPCRouter({ sb.select.values = `distinct ${property} as values`; } + sb.where.date = `created_at BETWEEN toDate('${formatClickhouseDate(startDate)}') AND toDate('${formatClickhouseDate(endDate)}')`; + const events = await chQuery<{ values: string[] }>(getSql()); const values = pipe( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 44ebdc65..b0705f94 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -819,6 +819,9 @@ importers: packages/cli: dependencies: + '@openpanel/common': + specifier: workspace:* + version: link:../common arg: specifier: ^5.0.2 version: 5.0.2