Files
stats/apps/worker/src/jobs/events.incoming-event.ts
2024-09-22 22:31:28 +02:00

247 lines
7.1 KiB
TypeScript

import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
import { parseUserAgent } from '@/utils/parse-user-agent';
import type { Job } from 'bullmq';
import { omit } from 'ramda';
import { v4 as uuid } from 'uuid';
import { getTime, isSameDomain, parsePath } from '@openpanel/common';
import type { IServiceCreateEventPayload } from '@openpanel/db';
import { createEvent } from '@openpanel/db';
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
import { findJobByPrefix, sessionsQueue } from '@openpanel/queue';
import type {
EventsQueuePayloadCreateSessionEnd,
EventsQueuePayloadIncomingEvent,
} from '@openpanel/queue';
import { getRedisQueue } from '@openpanel/redis';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
export const SESSION_TIMEOUT = 1000 * 60 * 30;
export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
const {
geo,
event: body,
headers,
projectId,
currentDeviceId,
previousDeviceId,
priority,
} = job.data.payload;
const properties = body.properties ?? {};
const getProperty = (name: string): string | undefined => {
// replace thing is just for older sdks when we didn't have `__`
// remove when kiddokitchen app (24.09.02) is not used anymore
return (
((properties[name] || properties[name.replace('__', '')]) as
| string
| null
| undefined) ?? undefined
);
};
// this will get the profileId from the alias table if it exists
const profileId = body.profileId ? String(body.profileId) : '';
const createdAt = new Date(body.timestamp);
const url = getProperty('__path');
const { path, hash, query, origin } = parsePath(url);
const referrer = isSameDomain(getProperty('__referrer'), url)
? null
: parseReferrer(getProperty('__referrer'));
const utmReferrer = getReferrerWithQuery(query);
const userAgent = headers['user-agent'];
const sdkName = headers['openpanel-sdk-name'];
const sdkVersion = headers['openpanel-sdk-version'];
const uaInfo = parseUserAgent(userAgent);
if (uaInfo.isServer) {
const event = await getLastScreenViewFromProfileId({
profileId,
projectId,
});
const payload: IServiceCreateEventPayload = {
name: body.name,
deviceId: event?.deviceId || '',
sessionId: event?.sessionId || '',
profileId,
projectId,
properties: {
...omit(GLOBAL_PROPERTIES, properties),
user_agent: userAgent,
},
createdAt,
country: event?.country || geo.country || '',
city: event?.city || geo.city || '',
region: event?.region || geo.region || '',
longitude: geo.longitude,
latitude: geo.latitude,
os: event?.os ?? '',
osVersion: event?.osVersion ?? '',
browser: event?.browser ?? '',
browserVersion: event?.browserVersion ?? '',
device: event?.device ?? uaInfo.device ?? '',
brand: event?.brand ?? '',
model: event?.model ?? '',
duration: 0,
path: event?.path ?? '',
origin: event?.origin ?? '',
referrer: event?.referrer ?? '',
referrerName: event?.referrerName ?? '',
referrerType: event?.referrerType ?? '',
sdkName,
sdkVersion,
};
return createEvent(payload);
}
const sessionEnd = await getSessionEndWithPriority(priority)({
projectId,
currentDeviceId,
previousDeviceId,
});
const sessionEndPayload =
sessionEnd?.job.data.payload ||
({
sessionId: uuid(),
deviceId: currentDeviceId,
profileId,
projectId,
} satisfies EventsQueuePayloadCreateSessionEnd['payload']);
const payload: IServiceCreateEventPayload = {
name: body.name,
deviceId: sessionEndPayload.deviceId,
sessionId: sessionEndPayload.sessionId,
profileId,
projectId,
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), {
__hash: hash,
__query: query,
}),
createdAt,
country: geo.country,
city: geo.city,
region: geo.region,
longitude: geo.longitude,
latitude: geo.latitude,
os: uaInfo?.os ?? '',
osVersion: uaInfo?.osVersion ?? '',
browser: uaInfo?.browser ?? '',
browserVersion: uaInfo?.browserVersion ?? '',
device: uaInfo?.device ?? '',
brand: uaInfo?.brand ?? '',
model: uaInfo?.model ?? '',
duration: 0,
path: path,
origin: origin,
referrer: sessionEnd ? sessionEndPayload.referrer : referrer?.url || '',
referrerName: sessionEnd
? sessionEndPayload.referrerName
: referrer?.name || utmReferrer?.name || '',
referrerType: sessionEnd
? sessionEndPayload.referrerType
: referrer?.type || utmReferrer?.type || '',
sdkName,
sdkVersion,
};
const sessionEndJobId =
sessionEnd?.job.id ??
`sessionEnd:${projectId}:${sessionEndPayload.deviceId}:${getTime(createdAt)}`;
if (sessionEnd) {
// If for some reason we have a session end job that is not a createSessionEnd job
if (sessionEnd.job.data.type !== 'createSessionEnd') {
throw new Error('Invalid session end job');
}
await sessionEnd.job.changeDelay(SESSION_TIMEOUT);
} else {
await sessionsQueue.add(
'session',
{
type: 'createSessionEnd',
payload,
},
{
delay: SESSION_TIMEOUT,
jobId: sessionEndJobId,
},
);
}
if (!sessionEnd) {
await createEvent({
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
});
}
return createEvent(payload);
}
function getSessionEndWithPriority(
priority: boolean,
count = 0,
): typeof getSessionEnd {
return async (args) => {
const res = await getSessionEnd(args);
if (count > 10) {
throw new Error('Failed to get session end');
}
// if we get simultaneous requests we want to avoid race conditions with getting the session end
// one of the events will get priority and the other will wait for the first to finish
if (res === null && priority === false) {
await new Promise((resolve) => setTimeout(resolve, 50));
return getSessionEndWithPriority(priority, count + 1)(args);
}
return res;
};
}
async function getSessionEnd({
projectId,
currentDeviceId,
previousDeviceId,
}: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
}) {
const currentSessionEndKeys = await getRedisQueue().keys(
`*:sessionEnd:${projectId}:${currentDeviceId}:*`,
);
const sessionEndJobCurrentDeviceId = await findJobByPrefix(
sessionsQueue,
currentSessionEndKeys,
`sessionEnd:${projectId}:${currentDeviceId}:`,
);
if (sessionEndJobCurrentDeviceId) {
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId };
}
const previousSessionEndKeys = await getRedisQueue().keys(
`*:sessionEnd:${projectId}:${previousDeviceId}:*`,
);
const sessionEndJobPreviousDeviceId = await findJobByPrefix(
sessionsQueue,
previousSessionEndKeys,
`sessionEnd:${projectId}:${previousDeviceId}:`,
);
if (sessionEndJobPreviousDeviceId) {
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId };
}
// Create session
return null;
}