perf(worker): try to improve perf of worker

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-09-24 09:19:30 +02:00
parent a1278ca371
commit 25a7365569
6 changed files with 425 additions and 359 deletions

View File

@@ -8,6 +8,7 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common';
import type { IServiceCreateEventPayload } from '@openpanel/db';
import { createEvent } from '@openpanel/db';
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
import { createLogger } from '@openpanel/logger';
import { findJobByPrefix, sessionsQueue } from '@openpanel/queue';
import type {
EventsQueuePayloadCreateSessionEnd,
@@ -18,6 +19,13 @@ import { getRedisQueue } from '@openpanel/redis';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
export const SESSION_TIMEOUT = 1000 * 60 * 30;
const logger = createLogger({
name: 'job:incoming-event',
});
const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`;
export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
const {
geo,
@@ -148,10 +156,6 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
sdkVersion,
};
const sessionEndJobId =
sessionEnd?.job.id ??
`sessionEnd:${projectId}:${sessionEndPayload.deviceId}:${getTime(createdAt)}`;
if (sessionEnd) {
// If for some reason we have a session end job that is not a createSessionEnd job
if (sessionEnd.job.data.type !== 'createSessionEnd') {
@@ -168,7 +172,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
},
{
delay: SESSION_TIMEOUT,
jobId: sessionEndJobId,
jobId: getSessionEndJobId(projectId, sessionEndPayload.deviceId),
},
);
}
@@ -215,6 +219,21 @@ async function getSessionEnd({
currentDeviceId: string;
previousDeviceId: string;
}) {
const job = await sessionsQueue.getJob(
getSessionEndJobId(projectId, currentDeviceId),
);
if (job && (await job.isDelayed())) {
return { deviceId: currentDeviceId, job };
}
const previousJob = await sessionsQueue.getJob(
getSessionEndJobId(projectId, previousDeviceId),
);
if (previousJob && (await previousJob.isDelayed())) {
return { deviceId: previousDeviceId, job: previousJob };
}
// Fallback during migration period
const currentSessionEndKeys = await getRedisQueue().keys(
`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}:*`,
);
@@ -225,6 +244,7 @@ async function getSessionEnd({
`sessionEnd:${projectId}:${currentDeviceId}:`,
);
if (sessionEndJobCurrentDeviceId) {
logger.info('found session end job for current device (old)');
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId };
}
@@ -238,6 +258,7 @@ async function getSessionEnd({
`sessionEnd:${projectId}:${previousDeviceId}:`,
);
if (sessionEndJobPreviousDeviceId) {
logger.info('found session end job for previous device (old)');
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId };
}