diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 5d19f1cc..87bc2418 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -4,20 +4,12 @@ import type { Job } from 'bullmq'; import { omit } from 'ramda'; import { v4 as uuid } from 'uuid'; -import { - getTime, - isSameDomain, - parsePath, - toISOString, -} from '@openpanel/common'; -import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; +import { getTime, isSameDomain, parsePath } from '@openpanel/common'; +import type { IServiceCreateEventPayload } from '@openpanel/db'; import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; -import { eventsQueue, findJobByPrefix, sessionsQueue } from '@openpanel/queue'; -import type { - EventsQueuePayloadCreateSessionEnd, - EventsQueuePayloadIncomingEvent, -} from '@openpanel/queue'; +import { findJobByPrefix, sessionsQueue } from '@openpanel/queue'; +import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; @@ -66,7 +58,7 @@ export async function incomingEvent(job: Job) { projectId, }); - const payload: Omit = { + const payload: IServiceCreateEventPayload = { name: body.name, deviceId: event?.deviceId || '', sessionId: event?.sessionId || '', @@ -95,9 +87,6 @@ export async function incomingEvent(job: Job) { referrer: event?.referrer ?? '', referrerName: event?.referrerName ?? '', referrerType: event?.referrerType ?? '', - profile: undefined, - meta: undefined, - importedAt: null, sdkName, sdkVersion, }; @@ -111,8 +100,7 @@ export async function incomingEvent(job: Job) { previousDeviceId, }); - const sessionEndPayload = (sessionEnd?.job?.data - ?.payload as EventsQueuePayloadCreateSessionEnd['payload']) || { + const sessionEndPayload = sessionEnd?.job.data.payload || { sessionId: uuid(), deviceId: currentDeviceId, profileId, @@ -195,10 +183,6 @@ function getSessionEndWithPriority( return async (args) => { const res = await getSessionEnd(args); - if (count > 5) { - throw new Error('Failed to get session end'); - } - // if we get simultaneous requests we want to avoid race conditions with getting the session end // one of the events will get priority and the other will wait for the first to finish if (res === null && priority === false) { @@ -206,6 +190,10 @@ function getSessionEndWithPriority( return getSessionEndWithPriority(priority, count + 1)(args); } + if (count > 10) { + throw new Error('Failed to get session end'); + } + return res; }; } @@ -232,15 +220,6 @@ async function getSessionEnd({ return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId }; } - const sessionEndJobCurrentDeviceId2 = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${currentDeviceId}:` - ); - if (sessionEndJobCurrentDeviceId2) { - return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId2 }; - } - const sessionEndJobPreviousDeviceId = await findJobByPrefix( sessionsQueue, sessionEndKeys, @@ -250,15 +229,6 @@ async function getSessionEnd({ return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId }; } - const sessionEndJobPreviousDeviceId2 = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${previousDeviceId}:` - ); - if (sessionEndJobPreviousDeviceId2) { - return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId2 }; - } - // Create session return null; }