diff --git a/apps/api/scripts/mock.ts b/apps/api/scripts/mock.ts index 38bfc18e..3619f9c4 100644 --- a/apps/api/scripts/mock.ts +++ b/apps/api/scripts/mock.ts @@ -140,44 +140,29 @@ function scrambleEvents(events: Event[]) { return events.sort(() => Math.random() - 0.5); } -// Distribute events over X minutes -const MINUTES = 3; -const SIX_MINUTES_MS = 1000 * 60 * MINUTES; -const startTime = Date.now(); let lastTriggeredIndex = 0; -async function triggerEvents(file: string) { - const generatedEvents = require(`./${file}`); - const currentTime = Date.now(); - const elapsedTime = currentTime - startTime; +async function triggerEvents(generatedEvents: any[]) { + const EVENTS_PER_SECOND = 100; // Adjust this value to set the desired events per second + const INTERVAL_MS = 1000 / EVENTS_PER_SECOND; - if (elapsedTime >= SIX_MINUTES_MS) { + if (lastTriggeredIndex >= generatedEvents.length) { console.log('All events triggered.'); return; } - const eventsToTrigger = Math.min( - Math.ceil(generatedEvents.length * (elapsedTime / SIX_MINUTES_MS)), - generatedEvents.length, - ); - - // Send events that haven't been triggered yet - for (let i = lastTriggeredIndex; i < eventsToTrigger; i++) { - console.log('about to send'); - - const event = generatedEvents[i]!; - try { - await trackit(event); - console.log(`Event ${i + 1} sent successfully`); - } catch (error) { - console.error(`Failed to send event ${i + 1}:`, error); - } + const event = generatedEvents[lastTriggeredIndex]!; + try { + await trackit(event); + console.log(`Event ${lastTriggeredIndex + 1} sent successfully`); console.log( `sending ${event.track.payload?.properties?.__path} from user ${event.headers['user-agent']}`, ); + } catch (error) { + console.error(`Failed to send event ${lastTriggeredIndex + 1}:`, error); } - lastTriggeredIndex = eventsToTrigger; + lastTriggeredIndex++; const remainingEvents = generatedEvents.length - lastTriggeredIndex; console.log( @@ -185,7 +170,7 @@ async function triggerEvents(file: string) { ); if (remainingEvents > 0) { - setTimeout(() => triggerEvents(file), 50); // Check every 50ms + setTimeout(() => triggerEvents(generatedEvents), INTERVAL_MS); } else { console.log('All events triggered.'); } @@ -222,8 +207,43 @@ async function createMock(file: string) { ); } +function insertFakeEvents(events: Event[]) { + const blueprint = { + headers: { + 'openpanel-client-id': '5b679c47-9ec0-470a-8944-a9ab8f42b14f', + 'x-client-ip': '229.145.77.175', + 'user-agent': + 'Opera/13.66 (Macintosh; Intel Mac OS X 10.8.3 U; GV Presto/2.9.183 Version/11.00)', + origin: 'https://classic-hovel.info', + }, + track: { + type: 'track', + payload: { + name: 'screen_view', + properties: { + __referrer: 'https://www.google.com', + __path: 'https://classic-hovel.info/beneficium-arcesso-quisquam', + __title: 'Hic thesis laboriosam copiose admoveo sufficio.', + }, + }, + }, + }; + const newEvents = []; + for (const event of events) { + newEvents.push(event); + if (Math.random() < 0.6) { + const fakeEvent = JSON.parse(JSON.stringify(blueprint)); + fakeEvent.track.payload.name = faker.allFakers.en.lorem.word(); + delete fakeEvent.track.payload.properties; + newEvents.push(fakeEvent); + } + } + + return newEvents; +} + async function simultaneousRequests() { - const events = require('./api-requests.json'); + const events = require('./mock-basic.json'); const screenView = events[0]!; const event = JSON.parse(JSON.stringify(events[0])); event.track.payload.name = 'click_button'; @@ -243,7 +263,7 @@ async function main() { switch (type) { case 'send': - await triggerEvents(file); + await triggerEvents(insertFakeEvents(require(`./${file}`))); break; case 'sim': await simultaneousRequests(); diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index c4e2c85c..0f6d6a56 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -8,7 +8,7 @@ import { fastifyTRPCPlugin } from '@trpc/server/adapters/fastify'; import type { FastifyBaseLogger, FastifyRequest } from 'fastify'; import Fastify from 'fastify'; import metricsPlugin from 'fastify-metrics'; -import { path } from 'ramda'; +import { path, pick } from 'ramda'; import { generateId, round } from '@openpanel/common'; import { TABLE_NAMES, chQuery, db } from '@openpanel/db'; @@ -104,14 +104,22 @@ const startServer = async () => { url: request.url.split('?')[0], method: request.method, input: getTrpcInput(request), - responseTime: reply.elapsedTime, + elapsed: reply.elapsedTime, }); } else { request.log.info('request done', { url: request.url, method: request.method, - responseTime: reply.elapsedTime, - headers: request.headers, + elapsed: reply.elapsedTime, + headers: pick( + [ + 'openpanel-client-id', + 'openpanel-sdk-name', + 'openpanel-sdk-version', + 'user-agent', + ], + request.headers, + ), body: request.body, }); } diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index bde4bf1d..efaee662 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -99,7 +99,7 @@ async function start() { logger.info('job completed', { worker: worker.name, data: job.data, - duration: + elapsed: job.processedOn && job.finishedOn ? job.finishedOn - job.processedOn : undefined, diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index 2b5e697e..f61e897f 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -44,7 +44,7 @@ async function getCompleteSessionWithSessionStart({ logger: ILogger; }): Promise> { const intervals = [6, 12, 24, 72]; - + let intervalIndex = 0; for (const hoursInterval of intervals) { const events = await getCompleteSession({ projectId, @@ -56,7 +56,10 @@ async function getCompleteSessionWithSessionStart({ return events; } - logger.warn(`Checking last ${hoursInterval} hours for session_start`); + const nextHoursInterval = intervals[++intervalIndex]; + if (nextHoursInterval) { + logger.warn(`Checking last ${nextHoursInterval} hours for session_start`); + } } return []; diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 45e285ea..1c6262a8 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -9,7 +9,11 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import type { IServiceCreateEventPayload } from '@openpanel/db'; import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; -import { findJobByPrefix, sessionsQueue } from '@openpanel/queue'; +import { + findJobByPrefix, + sessionsQueue, + sessionsQueueEvents, +} from '@openpanel/queue'; import type { EventsQueuePayloadCreateSessionEnd, EventsQueuePayloadIncomingEvent, @@ -215,18 +219,48 @@ async function getSessionEnd({ currentDeviceId: string; previousDeviceId: string; }) { + async function handleJobStates( + job: Job, + ): Promise<{ deviceId: string; job: Job } | null> { + const state = await job.getState(); + if (state === 'delayed') { + return { deviceId: currentDeviceId, job }; + } + + if (state === 'completed' || state === 'failed') { + await job.remove(); + } + + if (state === 'active' || state === 'waiting') { + await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); + return getSessionEnd({ + projectId, + currentDeviceId, + previousDeviceId, + }); + } + + return null; + } + const job = await sessionsQueue.getJob( getSessionEndJobId(projectId, currentDeviceId), ); - if (job && (await job.isDelayed())) { - return { deviceId: currentDeviceId, job }; + if (job) { + const res = await handleJobStates(job); + if (res) { + return res; + } } const previousJob = await sessionsQueue.getJob( getSessionEndJobId(projectId, previousDeviceId), ); - if (previousJob && (await previousJob.isDelayed())) { - return { deviceId: previousDeviceId, job: previousJob }; + if (previousJob) { + const res = await handleJobStates(previousJob); + if (res) { + return res; + } } // Fallback during migration period diff --git a/packages/queue/index.ts b/packages/queue/index.ts index 647ff89f..9db794d7 100644 --- a/packages/queue/index.ts +++ b/packages/queue/index.ts @@ -1,4 +1,9 @@ -export { eventsQueue, cronQueue, sessionsQueue } from './src/queues'; +export { + eventsQueue, + cronQueue, + sessionsQueue, + sessionsQueueEvents, +} from './src/queues'; export type * from './src/queues'; export { findJobByPrefix } from './src/utils'; export type { JobsOptions } from 'bullmq'; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index a6f8a9ad..434c8bd7 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -1,4 +1,4 @@ -import { Queue } from 'bullmq'; +import { Queue, QueueEvents } from 'bullmq'; import type { IServiceEvent } from '@openpanel/db'; import { getRedisQueue } from '@openpanel/redis'; @@ -80,6 +80,9 @@ export const sessionsQueue = new Queue('sessions', { removeOnComplete: 10, }, }); +export const sessionsQueueEvents = new QueueEvents('sessions', { + connection: getRedisQueue(), +}); export const cronQueue = new Queue('cron', { connection: getRedisQueue(),