fix(worker): handle edge cases when jobs are not delayed
This commit is contained in:
@@ -140,44 +140,29 @@ function scrambleEvents(events: Event[]) {
|
||||
return events.sort(() => Math.random() - 0.5);
|
||||
}
|
||||
|
||||
// Distribute events over X minutes
|
||||
const MINUTES = 3;
|
||||
const SIX_MINUTES_MS = 1000 * 60 * MINUTES;
|
||||
const startTime = Date.now();
|
||||
let lastTriggeredIndex = 0;
|
||||
|
||||
async function triggerEvents(file: string) {
|
||||
const generatedEvents = require(`./${file}`);
|
||||
const currentTime = Date.now();
|
||||
const elapsedTime = currentTime - startTime;
|
||||
async function triggerEvents(generatedEvents: any[]) {
|
||||
const EVENTS_PER_SECOND = 100; // Adjust this value to set the desired events per second
|
||||
const INTERVAL_MS = 1000 / EVENTS_PER_SECOND;
|
||||
|
||||
if (elapsedTime >= SIX_MINUTES_MS) {
|
||||
if (lastTriggeredIndex >= generatedEvents.length) {
|
||||
console.log('All events triggered.');
|
||||
return;
|
||||
}
|
||||
|
||||
const eventsToTrigger = Math.min(
|
||||
Math.ceil(generatedEvents.length * (elapsedTime / SIX_MINUTES_MS)),
|
||||
generatedEvents.length,
|
||||
);
|
||||
|
||||
// Send events that haven't been triggered yet
|
||||
for (let i = lastTriggeredIndex; i < eventsToTrigger; i++) {
|
||||
console.log('about to send');
|
||||
|
||||
const event = generatedEvents[i]!;
|
||||
try {
|
||||
await trackit(event);
|
||||
console.log(`Event ${i + 1} sent successfully`);
|
||||
} catch (error) {
|
||||
console.error(`Failed to send event ${i + 1}:`, error);
|
||||
}
|
||||
const event = generatedEvents[lastTriggeredIndex]!;
|
||||
try {
|
||||
await trackit(event);
|
||||
console.log(`Event ${lastTriggeredIndex + 1} sent successfully`);
|
||||
console.log(
|
||||
`sending ${event.track.payload?.properties?.__path} from user ${event.headers['user-agent']}`,
|
||||
);
|
||||
} catch (error) {
|
||||
console.error(`Failed to send event ${lastTriggeredIndex + 1}:`, error);
|
||||
}
|
||||
|
||||
lastTriggeredIndex = eventsToTrigger;
|
||||
lastTriggeredIndex++;
|
||||
const remainingEvents = generatedEvents.length - lastTriggeredIndex;
|
||||
|
||||
console.log(
|
||||
@@ -185,7 +170,7 @@ async function triggerEvents(file: string) {
|
||||
);
|
||||
|
||||
if (remainingEvents > 0) {
|
||||
setTimeout(() => triggerEvents(file), 50); // Check every 50ms
|
||||
setTimeout(() => triggerEvents(generatedEvents), INTERVAL_MS);
|
||||
} else {
|
||||
console.log('All events triggered.');
|
||||
}
|
||||
@@ -222,8 +207,43 @@ async function createMock(file: string) {
|
||||
);
|
||||
}
|
||||
|
||||
function insertFakeEvents(events: Event[]) {
|
||||
const blueprint = {
|
||||
headers: {
|
||||
'openpanel-client-id': '5b679c47-9ec0-470a-8944-a9ab8f42b14f',
|
||||
'x-client-ip': '229.145.77.175',
|
||||
'user-agent':
|
||||
'Opera/13.66 (Macintosh; Intel Mac OS X 10.8.3 U; GV Presto/2.9.183 Version/11.00)',
|
||||
origin: 'https://classic-hovel.info',
|
||||
},
|
||||
track: {
|
||||
type: 'track',
|
||||
payload: {
|
||||
name: 'screen_view',
|
||||
properties: {
|
||||
__referrer: 'https://www.google.com',
|
||||
__path: 'https://classic-hovel.info/beneficium-arcesso-quisquam',
|
||||
__title: 'Hic thesis laboriosam copiose admoveo sufficio.',
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const newEvents = [];
|
||||
for (const event of events) {
|
||||
newEvents.push(event);
|
||||
if (Math.random() < 0.6) {
|
||||
const fakeEvent = JSON.parse(JSON.stringify(blueprint));
|
||||
fakeEvent.track.payload.name = faker.allFakers.en.lorem.word();
|
||||
delete fakeEvent.track.payload.properties;
|
||||
newEvents.push(fakeEvent);
|
||||
}
|
||||
}
|
||||
|
||||
return newEvents;
|
||||
}
|
||||
|
||||
async function simultaneousRequests() {
|
||||
const events = require('./api-requests.json');
|
||||
const events = require('./mock-basic.json');
|
||||
const screenView = events[0]!;
|
||||
const event = JSON.parse(JSON.stringify(events[0]));
|
||||
event.track.payload.name = 'click_button';
|
||||
@@ -243,7 +263,7 @@ async function main() {
|
||||
|
||||
switch (type) {
|
||||
case 'send':
|
||||
await triggerEvents(file);
|
||||
await triggerEvents(insertFakeEvents(require(`./${file}`)));
|
||||
break;
|
||||
case 'sim':
|
||||
await simultaneousRequests();
|
||||
|
||||
@@ -8,7 +8,7 @@ import { fastifyTRPCPlugin } from '@trpc/server/adapters/fastify';
|
||||
import type { FastifyBaseLogger, FastifyRequest } from 'fastify';
|
||||
import Fastify from 'fastify';
|
||||
import metricsPlugin from 'fastify-metrics';
|
||||
import { path } from 'ramda';
|
||||
import { path, pick } from 'ramda';
|
||||
|
||||
import { generateId, round } from '@openpanel/common';
|
||||
import { TABLE_NAMES, chQuery, db } from '@openpanel/db';
|
||||
@@ -104,14 +104,22 @@ const startServer = async () => {
|
||||
url: request.url.split('?')[0],
|
||||
method: request.method,
|
||||
input: getTrpcInput(request),
|
||||
responseTime: reply.elapsedTime,
|
||||
elapsed: reply.elapsedTime,
|
||||
});
|
||||
} else {
|
||||
request.log.info('request done', {
|
||||
url: request.url,
|
||||
method: request.method,
|
||||
responseTime: reply.elapsedTime,
|
||||
headers: request.headers,
|
||||
elapsed: reply.elapsedTime,
|
||||
headers: pick(
|
||||
[
|
||||
'openpanel-client-id',
|
||||
'openpanel-sdk-name',
|
||||
'openpanel-sdk-version',
|
||||
'user-agent',
|
||||
],
|
||||
request.headers,
|
||||
),
|
||||
body: request.body,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ async function start() {
|
||||
logger.info('job completed', {
|
||||
worker: worker.name,
|
||||
data: job.data,
|
||||
duration:
|
||||
elapsed:
|
||||
job.processedOn && job.finishedOn
|
||||
? job.finishedOn - job.processedOn
|
||||
: undefined,
|
||||
|
||||
@@ -44,7 +44,7 @@ async function getCompleteSessionWithSessionStart({
|
||||
logger: ILogger;
|
||||
}): Promise<ReturnType<typeof getEvents>> {
|
||||
const intervals = [6, 12, 24, 72];
|
||||
|
||||
let intervalIndex = 0;
|
||||
for (const hoursInterval of intervals) {
|
||||
const events = await getCompleteSession({
|
||||
projectId,
|
||||
@@ -56,7 +56,10 @@ async function getCompleteSessionWithSessionStart({
|
||||
return events;
|
||||
}
|
||||
|
||||
logger.warn(`Checking last ${hoursInterval} hours for session_start`);
|
||||
const nextHoursInterval = intervals[++intervalIndex];
|
||||
if (nextHoursInterval) {
|
||||
logger.warn(`Checking last ${nextHoursInterval} hours for session_start`);
|
||||
}
|
||||
}
|
||||
|
||||
return [];
|
||||
|
||||
@@ -9,7 +9,11 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common';
|
||||
import type { IServiceCreateEventPayload } from '@openpanel/db';
|
||||
import { createEvent } from '@openpanel/db';
|
||||
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
|
||||
import { findJobByPrefix, sessionsQueue } from '@openpanel/queue';
|
||||
import {
|
||||
findJobByPrefix,
|
||||
sessionsQueue,
|
||||
sessionsQueueEvents,
|
||||
} from '@openpanel/queue';
|
||||
import type {
|
||||
EventsQueuePayloadCreateSessionEnd,
|
||||
EventsQueuePayloadIncomingEvent,
|
||||
@@ -215,18 +219,48 @@ async function getSessionEnd({
|
||||
currentDeviceId: string;
|
||||
previousDeviceId: string;
|
||||
}) {
|
||||
async function handleJobStates(
|
||||
job: Job,
|
||||
): Promise<{ deviceId: string; job: Job } | null> {
|
||||
const state = await job.getState();
|
||||
if (state === 'delayed') {
|
||||
return { deviceId: currentDeviceId, job };
|
||||
}
|
||||
|
||||
if (state === 'completed' || state === 'failed') {
|
||||
await job.remove();
|
||||
}
|
||||
|
||||
if (state === 'active' || state === 'waiting') {
|
||||
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
|
||||
return getSessionEnd({
|
||||
projectId,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
});
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
const job = await sessionsQueue.getJob(
|
||||
getSessionEndJobId(projectId, currentDeviceId),
|
||||
);
|
||||
if (job && (await job.isDelayed())) {
|
||||
return { deviceId: currentDeviceId, job };
|
||||
if (job) {
|
||||
const res = await handleJobStates(job);
|
||||
if (res) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
const previousJob = await sessionsQueue.getJob(
|
||||
getSessionEndJobId(projectId, previousDeviceId),
|
||||
);
|
||||
if (previousJob && (await previousJob.isDelayed())) {
|
||||
return { deviceId: previousDeviceId, job: previousJob };
|
||||
if (previousJob) {
|
||||
const res = await handleJobStates(previousJob);
|
||||
if (res) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback during migration period
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
export { eventsQueue, cronQueue, sessionsQueue } from './src/queues';
|
||||
export {
|
||||
eventsQueue,
|
||||
cronQueue,
|
||||
sessionsQueue,
|
||||
sessionsQueueEvents,
|
||||
} from './src/queues';
|
||||
export type * from './src/queues';
|
||||
export { findJobByPrefix } from './src/utils';
|
||||
export type { JobsOptions } from 'bullmq';
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Queue } from 'bullmq';
|
||||
import { Queue, QueueEvents } from 'bullmq';
|
||||
|
||||
import type { IServiceEvent } from '@openpanel/db';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
@@ -80,6 +80,9 @@ export const sessionsQueue = new Queue<SessionsQueuePayload>('sessions', {
|
||||
removeOnComplete: 10,
|
||||
},
|
||||
});
|
||||
export const sessionsQueueEvents = new QueueEvents('sessions', {
|
||||
connection: getRedisQueue(),
|
||||
});
|
||||
|
||||
export const cronQueue = new Queue<CronQueuePayload>('cron', {
|
||||
connection: getRedisQueue(),
|
||||
|
||||
Reference in New Issue
Block a user