chore(worker): minor adjustment to worker
This commit is contained in:
@@ -4,20 +4,12 @@ import type { Job } from 'bullmq';
|
|||||||
import { omit } from 'ramda';
|
import { omit } from 'ramda';
|
||||||
import { v4 as uuid } from 'uuid';
|
import { v4 as uuid } from 'uuid';
|
||||||
|
|
||||||
import {
|
import { getTime, isSameDomain, parsePath } from '@openpanel/common';
|
||||||
getTime,
|
import type { IServiceCreateEventPayload } from '@openpanel/db';
|
||||||
isSameDomain,
|
|
||||||
parsePath,
|
|
||||||
toISOString,
|
|
||||||
} from '@openpanel/common';
|
|
||||||
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
|
|
||||||
import { createEvent } from '@openpanel/db';
|
import { createEvent } from '@openpanel/db';
|
||||||
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
|
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
|
||||||
import { eventsQueue, findJobByPrefix, sessionsQueue } from '@openpanel/queue';
|
import { findJobByPrefix, sessionsQueue } from '@openpanel/queue';
|
||||||
import type {
|
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
|
||||||
EventsQueuePayloadCreateSessionEnd,
|
|
||||||
EventsQueuePayloadIncomingEvent,
|
|
||||||
} from '@openpanel/queue';
|
|
||||||
import { getRedisQueue } from '@openpanel/redis';
|
import { getRedisQueue } from '@openpanel/redis';
|
||||||
|
|
||||||
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
|
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
|
||||||
@@ -66,7 +58,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
projectId,
|
projectId,
|
||||||
});
|
});
|
||||||
|
|
||||||
const payload: Omit<IServiceEvent, 'id'> = {
|
const payload: IServiceCreateEventPayload = {
|
||||||
name: body.name,
|
name: body.name,
|
||||||
deviceId: event?.deviceId || '',
|
deviceId: event?.deviceId || '',
|
||||||
sessionId: event?.sessionId || '',
|
sessionId: event?.sessionId || '',
|
||||||
@@ -95,9 +87,6 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
referrer: event?.referrer ?? '',
|
referrer: event?.referrer ?? '',
|
||||||
referrerName: event?.referrerName ?? '',
|
referrerName: event?.referrerName ?? '',
|
||||||
referrerType: event?.referrerType ?? '',
|
referrerType: event?.referrerType ?? '',
|
||||||
profile: undefined,
|
|
||||||
meta: undefined,
|
|
||||||
importedAt: null,
|
|
||||||
sdkName,
|
sdkName,
|
||||||
sdkVersion,
|
sdkVersion,
|
||||||
};
|
};
|
||||||
@@ -111,8 +100,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
previousDeviceId,
|
previousDeviceId,
|
||||||
});
|
});
|
||||||
|
|
||||||
const sessionEndPayload = (sessionEnd?.job?.data
|
const sessionEndPayload = sessionEnd?.job.data.payload || {
|
||||||
?.payload as EventsQueuePayloadCreateSessionEnd['payload']) || {
|
|
||||||
sessionId: uuid(),
|
sessionId: uuid(),
|
||||||
deviceId: currentDeviceId,
|
deviceId: currentDeviceId,
|
||||||
profileId,
|
profileId,
|
||||||
@@ -195,10 +183,6 @@ function getSessionEndWithPriority(
|
|||||||
return async (args) => {
|
return async (args) => {
|
||||||
const res = await getSessionEnd(args);
|
const res = await getSessionEnd(args);
|
||||||
|
|
||||||
if (count > 5) {
|
|
||||||
throw new Error('Failed to get session end');
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we get simultaneous requests we want to avoid race conditions with getting the session end
|
// if we get simultaneous requests we want to avoid race conditions with getting the session end
|
||||||
// one of the events will get priority and the other will wait for the first to finish
|
// one of the events will get priority and the other will wait for the first to finish
|
||||||
if (res === null && priority === false) {
|
if (res === null && priority === false) {
|
||||||
@@ -206,6 +190,10 @@ function getSessionEndWithPriority(
|
|||||||
return getSessionEndWithPriority(priority, count + 1)(args);
|
return getSessionEndWithPriority(priority, count + 1)(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (count > 10) {
|
||||||
|
throw new Error('Failed to get session end');
|
||||||
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -232,15 +220,6 @@ async function getSessionEnd({
|
|||||||
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId };
|
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId };
|
||||||
}
|
}
|
||||||
|
|
||||||
const sessionEndJobCurrentDeviceId2 = await findJobByPrefix(
|
|
||||||
eventsQueue,
|
|
||||||
sessionEndKeys,
|
|
||||||
`sessionEnd:${projectId}:${currentDeviceId}:`
|
|
||||||
);
|
|
||||||
if (sessionEndJobCurrentDeviceId2) {
|
|
||||||
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId2 };
|
|
||||||
}
|
|
||||||
|
|
||||||
const sessionEndJobPreviousDeviceId = await findJobByPrefix(
|
const sessionEndJobPreviousDeviceId = await findJobByPrefix(
|
||||||
sessionsQueue,
|
sessionsQueue,
|
||||||
sessionEndKeys,
|
sessionEndKeys,
|
||||||
@@ -250,15 +229,6 @@ async function getSessionEnd({
|
|||||||
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId };
|
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId };
|
||||||
}
|
}
|
||||||
|
|
||||||
const sessionEndJobPreviousDeviceId2 = await findJobByPrefix(
|
|
||||||
eventsQueue,
|
|
||||||
sessionEndKeys,
|
|
||||||
`sessionEnd:${projectId}:${previousDeviceId}:`
|
|
||||||
);
|
|
||||||
if (sessionEndJobPreviousDeviceId2) {
|
|
||||||
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId2 };
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create session
|
// Create session
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user