move all logic in event.controller to worker (speed up request)
This commit is contained in:
@@ -1,95 +1,24 @@
|
||||
import { logger, logInfo, noop } from '@/utils/logger';
|
||||
import { getClientIp, parseIp } from '@/utils/parseIp';
|
||||
import { getReferrerWithQuery, parseReferrer } from '@/utils/parseReferrer';
|
||||
import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent';
|
||||
import { isSameDomain, parsePath } from '@/utils/url';
|
||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||
import { omit } from 'ramda';
|
||||
import { escape } from 'sqlstring';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { generateDeviceId, getTime, toISOString } from '@openpanel/common';
|
||||
import type { IServiceCreateEventPayload } from '@openpanel/db';
|
||||
import { createEvent, getEvents, getSalts } from '@openpanel/db';
|
||||
import type { JobsOptions } from '@openpanel/queue';
|
||||
import { generateDeviceId } from '@openpanel/common';
|
||||
import { getSalts } from '@openpanel/db';
|
||||
import { eventsQueue } from '@openpanel/queue';
|
||||
import { findJobByPrefix } from '@openpanel/queue/src/utils';
|
||||
import type { PostEventPayload } from '@openpanel/sdk';
|
||||
|
||||
const SESSION_TIMEOUT = 1000 * 60 * 30;
|
||||
const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000;
|
||||
|
||||
async function withTiming<T>(name: string, promise: Promise<T>) {
|
||||
try {
|
||||
const start = Date.now();
|
||||
const res = await promise;
|
||||
const end = Date.now();
|
||||
if (end - start > 1000) {
|
||||
logInfo(`${name} took too long: ${end - start}ms`);
|
||||
}
|
||||
return res;
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to execute ${name}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function createContextLogger(request: FastifyRequest) {
|
||||
const _log = request.log.child({
|
||||
requestId: request.id,
|
||||
requestUrl: request.url,
|
||||
headers: request.headers,
|
||||
projectId: request.projectId,
|
||||
});
|
||||
let obj: Record<string, unknown> = {};
|
||||
return {
|
||||
add: (key: string, value: unknown) => (obj[key] = value),
|
||||
addObject: (key: string, value: Record<string, unknown>) => {
|
||||
obj = { ...obj, ...value };
|
||||
},
|
||||
send: (message: string, value: Record<string, unknown>) =>
|
||||
_log.info({ ...obj, ...value }, message),
|
||||
};
|
||||
}
|
||||
|
||||
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
|
||||
|
||||
export async function postEvent(
|
||||
request: FastifyRequest<{
|
||||
Body: PostEventPayload;
|
||||
}>,
|
||||
reply: FastifyReply
|
||||
) {
|
||||
const contextLogger = createContextLogger(request);
|
||||
let deviceId: string | null = null;
|
||||
const { projectId, body } = request;
|
||||
const properties = body.properties ?? {};
|
||||
const getProperty = (name: string): string | undefined => {
|
||||
// replace thing is just for older sdks when we didn't have `__`
|
||||
// remove when kiddokitchen app (24.09.02) is not used anymore
|
||||
return (
|
||||
((properties[name] || properties[name.replace('__', '')]) as
|
||||
| string
|
||||
| null
|
||||
| undefined) ?? undefined
|
||||
);
|
||||
};
|
||||
const profileId = body.profileId ?? '';
|
||||
const createdAt = new Date(body.timestamp);
|
||||
const url = getProperty('__path');
|
||||
const { path, hash, query } = parsePath(url);
|
||||
const referrer = isSameDomain(getProperty('__referrer'), url)
|
||||
? null
|
||||
: parseReferrer(getProperty('__referrer'));
|
||||
const utmReferrer = getReferrerWithQuery(query);
|
||||
const ip = getClientIp(request)!;
|
||||
const origin = request.headers.origin!;
|
||||
const ua = request.headers['user-agent']!;
|
||||
const uaInfo = parseUserAgent(ua);
|
||||
const [geo, salts] = await Promise.all([parseIp(ip), getSalts()]);
|
||||
const origin = request.headers.origin!;
|
||||
const salts = await getSalts();
|
||||
const currentDeviceId = generateDeviceId({
|
||||
salt: salts.current,
|
||||
origin,
|
||||
origin: origin,
|
||||
ip,
|
||||
ua,
|
||||
});
|
||||
@@ -100,237 +29,20 @@ export async function postEvent(
|
||||
ua,
|
||||
});
|
||||
|
||||
const isServerEvent = !isUserAgentSet(ua);
|
||||
|
||||
if (isServerEvent) {
|
||||
const [event] = await withTiming(
|
||||
'Get last event (server-event)',
|
||||
getEvents(
|
||||
`SELECT * FROM events WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
|
||||
)
|
||||
);
|
||||
|
||||
const payload: Omit<IServiceCreateEventPayload, 'id'> = {
|
||||
name: body.name,
|
||||
deviceId: event?.deviceId || '',
|
||||
sessionId: event?.sessionId || '',
|
||||
profileId,
|
||||
projectId,
|
||||
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties)),
|
||||
createdAt,
|
||||
country: event?.country || geo.country || '',
|
||||
city: event?.city || geo.city || '',
|
||||
region: event?.region || geo.region || '',
|
||||
continent: event?.continent || geo.continent || '',
|
||||
os: event?.os ?? '',
|
||||
osVersion: event?.osVersion ?? '',
|
||||
browser: event?.browser ?? '',
|
||||
browserVersion: event?.browserVersion ?? '',
|
||||
device: event?.device ?? '',
|
||||
brand: event?.brand ?? '',
|
||||
model: event?.model ?? '',
|
||||
duration: 0,
|
||||
path: event?.path ?? '',
|
||||
referrer: event?.referrer ?? '',
|
||||
referrerName: event?.referrerName ?? '',
|
||||
referrerType: event?.referrerType ?? '',
|
||||
profile: undefined,
|
||||
meta: undefined,
|
||||
};
|
||||
|
||||
contextLogger.send('server event is queued', {
|
||||
ip,
|
||||
origin,
|
||||
ua,
|
||||
uaInfo,
|
||||
referrer,
|
||||
profileId,
|
||||
projectId,
|
||||
deviceId,
|
||||
path,
|
||||
payload,
|
||||
prevEvent: event,
|
||||
});
|
||||
|
||||
eventsQueue.add('event', {
|
||||
type: 'createEvent',
|
||||
payload,
|
||||
});
|
||||
return reply.status(200).send('');
|
||||
}
|
||||
|
||||
const [sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] =
|
||||
await withTiming(
|
||||
'Get geo and jobs from queue',
|
||||
Promise.all([
|
||||
findJobByPrefix(
|
||||
eventsQueue,
|
||||
`sessionEnd:${projectId}:${currentDeviceId}:`
|
||||
),
|
||||
findJobByPrefix(
|
||||
eventsQueue,
|
||||
`sessionEnd:${projectId}:${previousDeviceId}:`
|
||||
),
|
||||
])
|
||||
);
|
||||
|
||||
const createSessionStart =
|
||||
!sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId;
|
||||
|
||||
if (sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId) {
|
||||
deviceId = currentDeviceId;
|
||||
const diff = Date.now() - sessionEndJobCurrentDeviceId.timestamp;
|
||||
sessionEndJobCurrentDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
|
||||
} else if (!sessionEndJobCurrentDeviceId && sessionEndJobPreviousDeviceId) {
|
||||
deviceId = previousDeviceId;
|
||||
const diff = Date.now() - sessionEndJobPreviousDeviceId.timestamp;
|
||||
sessionEndJobPreviousDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
|
||||
} else {
|
||||
deviceId = currentDeviceId;
|
||||
// Queue session end
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'createSessionEnd',
|
||||
payload: {
|
||||
deviceId,
|
||||
},
|
||||
eventsQueue.add('event', {
|
||||
type: 'incomingEvent',
|
||||
payload: {
|
||||
projectId: request.projectId,
|
||||
headers: {
|
||||
origin,
|
||||
ua,
|
||||
},
|
||||
{
|
||||
delay: SESSION_END_TIMEOUT,
|
||||
jobId: `sessionEnd:${projectId}:${deviceId}:${Date.now()}`,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
const [[sessionStartEvent], prevEventJob] = await withTiming(
|
||||
'Get session start event',
|
||||
Promise.all([
|
||||
getEvents(
|
||||
`SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
|
||||
),
|
||||
findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`),
|
||||
])
|
||||
);
|
||||
|
||||
const payload: Omit<IServiceCreateEventPayload, 'id'> = {
|
||||
name: body.name,
|
||||
deviceId,
|
||||
profileId,
|
||||
projectId,
|
||||
sessionId: createSessionStart ? uuid() : sessionStartEvent?.sessionId ?? '',
|
||||
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), {
|
||||
__hash: hash,
|
||||
__query: query,
|
||||
}),
|
||||
createdAt,
|
||||
country: geo.country,
|
||||
city: geo.city,
|
||||
region: geo.region,
|
||||
continent: geo.continent,
|
||||
os: uaInfo.os,
|
||||
osVersion: uaInfo.osVersion,
|
||||
browser: uaInfo.browser,
|
||||
browserVersion: uaInfo.browserVersion,
|
||||
device: uaInfo.device,
|
||||
brand: uaInfo.brand,
|
||||
model: uaInfo.model,
|
||||
duration: 0,
|
||||
path: path,
|
||||
referrer: referrer?.url,
|
||||
referrerName: referrer?.name || utmReferrer?.name || '',
|
||||
referrerType: referrer?.type || utmReferrer?.type || '',
|
||||
profile: undefined,
|
||||
meta: undefined,
|
||||
};
|
||||
|
||||
const isDelayed = prevEventJob ? await prevEventJob?.isDelayed() : false;
|
||||
|
||||
if (isDelayed && prevEventJob && prevEventJob.data.type === 'createEvent') {
|
||||
const prevEvent = prevEventJob.data.payload;
|
||||
const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt);
|
||||
contextLogger.add('prevEvent', prevEvent);
|
||||
|
||||
// Set path from prev screen_view event if current event is not a screen_view
|
||||
if (payload.name != 'screen_view') {
|
||||
payload.path = prevEvent.path;
|
||||
}
|
||||
|
||||
if (payload.name === 'screen_view') {
|
||||
if (duration < 0) {
|
||||
contextLogger.send('duration is wrong', {
|
||||
payload,
|
||||
duration,
|
||||
});
|
||||
} else {
|
||||
// Skip update duration if it's wrong
|
||||
// Seems like request is not in right order
|
||||
await withTiming(
|
||||
'Update previous job with duration',
|
||||
prevEventJob.updateData({
|
||||
type: 'createEvent',
|
||||
payload: {
|
||||
...prevEvent,
|
||||
duration,
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
await withTiming('Promote previous job', prevEventJob.promote());
|
||||
}
|
||||
} else if (payload.name !== 'screen_view') {
|
||||
contextLogger.send('no previous job', {
|
||||
prevEventJob,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
|
||||
if (createSessionStart) {
|
||||
// We do not need to queue session_start
|
||||
await withTiming(
|
||||
'Create session start event',
|
||||
createEvent({
|
||||
...payload,
|
||||
name: 'session_start',
|
||||
// @ts-expect-error
|
||||
createdAt: toISOString(getTime(payload.createdAt) - 100),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
const options: JobsOptions = {};
|
||||
if (payload.name === 'screen_view') {
|
||||
options.delay = SESSION_TIMEOUT;
|
||||
options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`;
|
||||
}
|
||||
|
||||
contextLogger.send('event is queued', {
|
||||
ip,
|
||||
origin,
|
||||
ua,
|
||||
uaInfo,
|
||||
referrer,
|
||||
profileId,
|
||||
projectId,
|
||||
deviceId,
|
||||
geo,
|
||||
sessionStartEvent,
|
||||
path,
|
||||
payload,
|
||||
event: request.body,
|
||||
geo: await parseIp(ip),
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
},
|
||||
});
|
||||
|
||||
// Queue current event
|
||||
eventsQueue
|
||||
.add(
|
||||
'event',
|
||||
{
|
||||
type: 'createEvent',
|
||||
payload,
|
||||
},
|
||||
options
|
||||
)
|
||||
.catch(noop('Failed to queue event'));
|
||||
|
||||
reply.status(202).send(deviceId);
|
||||
reply.status(202).send('ok');
|
||||
}
|
||||
|
||||
@@ -13,13 +13,16 @@
|
||||
"dependencies": {
|
||||
"@bull-board/api": "^5.13.0",
|
||||
"@bull-board/express": "^5.13.0",
|
||||
"@openpanel/common": "workspace:*",
|
||||
"@openpanel/db": "workspace:*",
|
||||
"@openpanel/queue": "workspace:*",
|
||||
"@openpanel/common": "workspace:*",
|
||||
"@openpanel/redis": "workspace:*",
|
||||
"bullmq": "^5.1.1",
|
||||
"express": "^4.18.2",
|
||||
"ramda": "^0.29.1"
|
||||
"ramda": "^0.29.1",
|
||||
"sqlstring": "^2.3.3",
|
||||
"ua-parser-js": "^1.0.37",
|
||||
"uuid": "^9.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@openpanel/eslint-config": "workspace:*",
|
||||
@@ -27,6 +30,9 @@
|
||||
"@openpanel/tsconfig": "workspace:*",
|
||||
"@types/express": "^4.17.21",
|
||||
"@types/ramda": "^0.29.6",
|
||||
"@types/sqlstring": "^2.3.2",
|
||||
"@types/ua-parser-js": "^0.7.39",
|
||||
"@types/uuid": "^9.0.8",
|
||||
"eslint": "^8.48.0",
|
||||
"prettier": "^3.0.3",
|
||||
"tsup": "^7.2.0",
|
||||
|
||||
258
apps/worker/src/jobs/events.incoming-event.ts
Normal file
258
apps/worker/src/jobs/events.incoming-event.ts
Normal file
@@ -0,0 +1,258 @@
|
||||
import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
|
||||
import { isUserAgentSet, parseUserAgent } from '@/utils/parse-user-agent';
|
||||
import { isSameDomain, parsePath } from '@/utils/url';
|
||||
import type { Job, JobsOptions } from 'bullmq';
|
||||
import { omit } from 'ramda';
|
||||
import { escape } from 'sqlstring';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { getTime, toISOString } from '@openpanel/common';
|
||||
import type { IServiceCreateEventPayload } from '@openpanel/db';
|
||||
import { createEvent, getEvents } from '@openpanel/db';
|
||||
import { findJobByPrefix } from '@openpanel/queue';
|
||||
import { eventsQueue } from '@openpanel/queue/src/queues';
|
||||
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue/src/queues';
|
||||
|
||||
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
|
||||
const SESSION_TIMEOUT = 1000 * 60 * 30;
|
||||
const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000;
|
||||
|
||||
export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
||||
const {
|
||||
geo,
|
||||
event: body,
|
||||
headers,
|
||||
projectId,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
} = job.data.payload;
|
||||
let deviceId: string | null = null;
|
||||
|
||||
const properties = body.properties ?? {};
|
||||
const getProperty = (name: string): string | undefined => {
|
||||
// replace thing is just for older sdks when we didn't have `__`
|
||||
// remove when kiddokitchen app (24.09.02) is not used anymore
|
||||
return (
|
||||
((properties[name] || properties[name.replace('__', '')]) as
|
||||
| string
|
||||
| null
|
||||
| undefined) ?? undefined
|
||||
);
|
||||
};
|
||||
const { origin, ua } = headers;
|
||||
const profileId = body.profileId ?? '';
|
||||
const createdAt = new Date(body.timestamp);
|
||||
const url = getProperty('__path');
|
||||
const { path, hash, query } = parsePath(url);
|
||||
const referrer = isSameDomain(getProperty('__referrer'), url)
|
||||
? null
|
||||
: parseReferrer(getProperty('__referrer'));
|
||||
const utmReferrer = getReferrerWithQuery(query);
|
||||
const uaInfo = ua ? parseUserAgent(ua) : null;
|
||||
const isServerEvent = ua ? !isUserAgentSet(ua) : true;
|
||||
|
||||
if (isServerEvent) {
|
||||
const [event] = await getEvents(
|
||||
`SELECT * FROM events WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
|
||||
);
|
||||
|
||||
const payload: Omit<IServiceCreateEventPayload, 'id'> = {
|
||||
name: body.name,
|
||||
deviceId: event?.deviceId || '',
|
||||
sessionId: event?.sessionId || '',
|
||||
profileId,
|
||||
projectId,
|
||||
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties)),
|
||||
createdAt,
|
||||
country: event?.country || geo.country || '',
|
||||
city: event?.city || geo.city || '',
|
||||
region: event?.region || geo.region || '',
|
||||
continent: event?.continent || geo.continent || '',
|
||||
os: event?.os ?? '',
|
||||
osVersion: event?.osVersion ?? '',
|
||||
browser: event?.browser ?? '',
|
||||
browserVersion: event?.browserVersion ?? '',
|
||||
device: event?.device ?? '',
|
||||
brand: event?.brand ?? '',
|
||||
model: event?.model ?? '',
|
||||
duration: 0,
|
||||
path: event?.path ?? '',
|
||||
referrer: event?.referrer ?? '',
|
||||
referrerName: event?.referrerName ?? '',
|
||||
referrerType: event?.referrerType ?? '',
|
||||
profile: undefined,
|
||||
meta: undefined,
|
||||
};
|
||||
|
||||
return createEvent(payload);
|
||||
}
|
||||
|
||||
const [sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] =
|
||||
await Promise.all([
|
||||
findJobByPrefix(
|
||||
eventsQueue,
|
||||
`sessionEnd:${projectId}:${currentDeviceId}:`
|
||||
),
|
||||
findJobByPrefix(
|
||||
eventsQueue,
|
||||
`sessionEnd:${projectId}:${previousDeviceId}:`
|
||||
),
|
||||
]);
|
||||
|
||||
const createSessionStart =
|
||||
!sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId;
|
||||
|
||||
if (sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId) {
|
||||
deviceId = currentDeviceId;
|
||||
const diff = Date.now() - sessionEndJobCurrentDeviceId.timestamp;
|
||||
sessionEndJobCurrentDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
|
||||
} else if (!sessionEndJobCurrentDeviceId && sessionEndJobPreviousDeviceId) {
|
||||
deviceId = previousDeviceId;
|
||||
const diff = Date.now() - sessionEndJobPreviousDeviceId.timestamp;
|
||||
sessionEndJobPreviousDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
|
||||
} else {
|
||||
deviceId = currentDeviceId;
|
||||
// Queue session end
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'createSessionEnd',
|
||||
payload: {
|
||||
deviceId,
|
||||
},
|
||||
},
|
||||
{
|
||||
delay: SESSION_END_TIMEOUT,
|
||||
jobId: `sessionEnd:${projectId}:${deviceId}:${Date.now()}`,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
const [[sessionStartEvent], prevEventJob] = await Promise.all([
|
||||
getEvents(
|
||||
`SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
|
||||
),
|
||||
findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`),
|
||||
]);
|
||||
|
||||
const payload: Omit<IServiceCreateEventPayload, 'id'> = {
|
||||
name: body.name,
|
||||
deviceId,
|
||||
profileId,
|
||||
projectId,
|
||||
sessionId: createSessionStart ? uuid() : sessionStartEvent?.sessionId ?? '',
|
||||
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), {
|
||||
__hash: hash,
|
||||
__query: query,
|
||||
}),
|
||||
createdAt,
|
||||
country: geo.country,
|
||||
city: geo.city,
|
||||
region: geo.region,
|
||||
continent: geo.continent,
|
||||
os: uaInfo?.os ?? '',
|
||||
osVersion: uaInfo?.osVersion ?? '',
|
||||
browser: uaInfo?.browser ?? '',
|
||||
browserVersion: uaInfo?.browserVersion ?? '',
|
||||
device: uaInfo?.device ?? '',
|
||||
brand: uaInfo?.brand ?? '',
|
||||
model: uaInfo?.model ?? '',
|
||||
duration: 0,
|
||||
path: path,
|
||||
referrer: referrer?.url,
|
||||
referrerName: referrer?.name || utmReferrer?.name || '',
|
||||
referrerType: referrer?.type || utmReferrer?.type || '',
|
||||
profile: undefined,
|
||||
meta: undefined,
|
||||
};
|
||||
|
||||
const isDelayed = prevEventJob ? await prevEventJob?.isDelayed() : false;
|
||||
|
||||
if (isDelayed && prevEventJob && prevEventJob.data.type === 'createEvent') {
|
||||
const prevEvent = prevEventJob.data.payload;
|
||||
const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt);
|
||||
job.log(`prevEvent ${JSON.stringify(prevEvent, null, 2)}`);
|
||||
|
||||
// Set path from prev screen_view event if current event is not a screen_view
|
||||
if (payload.name != 'screen_view') {
|
||||
payload.path = prevEvent.path;
|
||||
}
|
||||
|
||||
if (payload.name === 'screen_view') {
|
||||
if (duration < 0) {
|
||||
job.log(`prevEvent ${JSON.stringify(prevEvent, null, 2)}`);
|
||||
} else {
|
||||
// Skip update duration if it's wrong
|
||||
// Seems like request is not in right order
|
||||
await prevEventJob.updateData({
|
||||
type: 'createEvent',
|
||||
payload: {
|
||||
...prevEvent,
|
||||
duration,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await prevEventJob.promote();
|
||||
}
|
||||
} else if (payload.name !== 'screen_view') {
|
||||
job.log(
|
||||
`no previous job ${JSON.stringify(
|
||||
{
|
||||
prevEventJob,
|
||||
payload,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
}
|
||||
|
||||
if (createSessionStart) {
|
||||
// We do not need to queue session_start
|
||||
await createEvent({
|
||||
...payload,
|
||||
name: 'session_start',
|
||||
// @ts-expect-error
|
||||
createdAt: toISOString(getTime(payload.createdAt) - 100),
|
||||
});
|
||||
}
|
||||
|
||||
const options: JobsOptions = {};
|
||||
if (payload.name === 'screen_view') {
|
||||
options.delay = SESSION_TIMEOUT;
|
||||
options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`;
|
||||
}
|
||||
|
||||
job.log(
|
||||
`event is queued ${JSON.stringify(
|
||||
{
|
||||
origin,
|
||||
ua,
|
||||
uaInfo,
|
||||
referrer,
|
||||
profileId,
|
||||
projectId,
|
||||
deviceId,
|
||||
geo,
|
||||
sessionStartEvent,
|
||||
path,
|
||||
payload,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
|
||||
// Queue event instead of creating it,
|
||||
// since we want to update duration if we get more events in the same session
|
||||
// The event will only be delayed if it's a screen_view event
|
||||
return eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'createEvent',
|
||||
payload,
|
||||
},
|
||||
options
|
||||
);
|
||||
}
|
||||
@@ -4,20 +4,23 @@ import { createEvent } from '@openpanel/db';
|
||||
import type {
|
||||
EventsQueuePayload,
|
||||
EventsQueuePayloadCreateSessionEnd,
|
||||
EventsQueuePayloadIncomingEvent,
|
||||
} from '@openpanel/queue/src/queues';
|
||||
|
||||
import { createSessionEnd } from './events.create-session-end';
|
||||
import { incomingEvent } from './events.incoming-event';
|
||||
|
||||
export async function eventsJob(job: Job<EventsQueuePayload>) {
|
||||
switch (job.data.type) {
|
||||
case 'incomingEvent': {
|
||||
return await incomingEvent(job as Job<EventsQueuePayloadIncomingEvent>);
|
||||
}
|
||||
case 'createEvent': {
|
||||
if (job.attemptsStarted > 1 && job.data.payload.duration < 0) {
|
||||
job.data.payload.duration = 0;
|
||||
}
|
||||
return await createEvent(job.data.payload);
|
||||
}
|
||||
}
|
||||
switch (job.data.type) {
|
||||
case 'createSessionEnd': {
|
||||
return await createSessionEnd(
|
||||
job as Job<EventsQueuePayloadCreateSessionEnd>
|
||||
|
||||
2684
apps/worker/src/referrers/index.ts
Normal file
2684
apps/worker/src/referrers/index.ts
Normal file
File diff suppressed because it is too large
Load Diff
5
apps/worker/src/referrers/referrers.readme.md
Normal file
5
apps/worker/src/referrers/referrers.readme.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Snowplow Referer Parser
|
||||
|
||||
The file index.ts in this dir is generated from snowplows referer database [Snowplow Referer Parser](https://github.com/snowplow-referer-parser/referer-parser).
|
||||
|
||||
The orginal [referers.yml](https://github.com/snowplow-referer-parser/referer-parser/blob/master/resources/referers.yml) is based on Piwik's SearchEngines.php and Socials.php, copyright 2012 Matthieu Aubry and available under the GNU General Public License v3.
|
||||
58
apps/worker/src/utils/parse-referrer.ts
Normal file
58
apps/worker/src/utils/parse-referrer.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { stripTrailingSlash } from '@openpanel/common';
|
||||
|
||||
import referrers from '../referrers';
|
||||
|
||||
function getHostname(url: string | undefined) {
|
||||
if (!url) {
|
||||
return '';
|
||||
}
|
||||
|
||||
try {
|
||||
return new URL(url).hostname;
|
||||
} catch (e) {
|
||||
return '';
|
||||
}
|
||||
}
|
||||
|
||||
export function parseReferrer(url: string | undefined) {
|
||||
const hostname = getHostname(url);
|
||||
const match = referrers[hostname] ?? referrers[hostname.replace('www.', '')];
|
||||
|
||||
return {
|
||||
name: match?.name ?? '',
|
||||
type: match?.type ?? 'unknown',
|
||||
url: stripTrailingSlash(url ?? ''),
|
||||
};
|
||||
}
|
||||
|
||||
export function getReferrerWithQuery(
|
||||
query: Record<string, string> | undefined
|
||||
) {
|
||||
if (!query) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const source = query.utm_source ?? query.ref ?? query.utm_referrer ?? '';
|
||||
|
||||
if (source === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
const match = Object.values(referrers).find(
|
||||
(referrer) => referrer.name.toLowerCase() === source.toLowerCase()
|
||||
);
|
||||
|
||||
if (match) {
|
||||
return {
|
||||
name: match.name,
|
||||
type: match.type,
|
||||
url: '',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
name: source,
|
||||
type: 'unknown',
|
||||
url: '',
|
||||
};
|
||||
}
|
||||
@@ -1,54 +1,23 @@
|
||||
export function getOS(ua?: string) {
|
||||
if (!ua) {
|
||||
return null;
|
||||
}
|
||||
if (/iPad/i.test(ua)) {
|
||||
return 'iPad';
|
||||
}
|
||||
if (/iPhone/i.test(ua)) {
|
||||
return 'iPhone';
|
||||
}
|
||||
if (/iPod/i.test(ua)) {
|
||||
return 'iPod';
|
||||
}
|
||||
if (/Macintosh/i.test(ua)) {
|
||||
return 'macOS';
|
||||
}
|
||||
if (/IEMobile|Windows/i.test(ua)) {
|
||||
return 'Windows';
|
||||
}
|
||||
if (/Android/i.test(ua)) {
|
||||
return 'Android';
|
||||
}
|
||||
if (/BlackBerry/i.test(ua)) {
|
||||
return 'BlackBerry';
|
||||
}
|
||||
if (/EF500/i.test(ua)) {
|
||||
return 'Bluebird';
|
||||
}
|
||||
if (/CrOS/i.test(ua)) {
|
||||
return 'Chrome OS';
|
||||
}
|
||||
if (/DL-AXIS/i.test(ua)) {
|
||||
return 'Datalogic';
|
||||
}
|
||||
if (/CT50/i.test(ua)) {
|
||||
return 'Honeywell';
|
||||
}
|
||||
if (/TC70|TC55/i.test(ua)) {
|
||||
return 'Zebra';
|
||||
}
|
||||
if (/Linux/i.test(ua)) {
|
||||
return 'Generic Linux';
|
||||
}
|
||||
return 'Unknown';
|
||||
import { UAParser } from 'ua-parser-js';
|
||||
|
||||
export function isUserAgentSet(ua: string) {
|
||||
return ua !== 'node' && ua !== 'undici' && !!ua;
|
||||
}
|
||||
|
||||
export function getDevice(ua?: string) {
|
||||
if (!ua) {
|
||||
return null;
|
||||
}
|
||||
export function parseUserAgent(ua: string) {
|
||||
const res = new UAParser(ua).getResult();
|
||||
return {
|
||||
os: res.os.name,
|
||||
osVersion: res.os.version,
|
||||
browser: res.browser.name,
|
||||
browserVersion: res.browser.version,
|
||||
device: res.device.type ?? getDevice(ua),
|
||||
brand: res.device.vendor,
|
||||
model: res.device.model,
|
||||
};
|
||||
}
|
||||
|
||||
export function getDevice(ua: string) {
|
||||
const t1 =
|
||||
/(android|bb\d+|meego).+mobile|avantgo|bada\/|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|mobile.+firefox|netfront|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\/|plucker|pocket|psp|series(4|6)0|symbian|treo|up\.(browser|link)|vodafone|wap|windows ce|xda|xiino/i.test(
|
||||
ua
|
||||
48
apps/worker/src/utils/url.ts
Normal file
48
apps/worker/src/utils/url.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
export function parseSearchParams(
|
||||
params: URLSearchParams
|
||||
): Record<string, string> | undefined {
|
||||
const result: Record<string, string> = {};
|
||||
for (const [key, value] of params.entries()) {
|
||||
result[key] = value;
|
||||
}
|
||||
return Object.keys(result).length ? result : undefined;
|
||||
}
|
||||
|
||||
export function parsePath(path?: string): {
|
||||
query?: Record<string, string>;
|
||||
path: string;
|
||||
hash?: string;
|
||||
} {
|
||||
if (!path) {
|
||||
return {
|
||||
path: '',
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const url = new URL(path);
|
||||
return {
|
||||
query: parseSearchParams(url.searchParams),
|
||||
path: url.pathname,
|
||||
hash: url.hash || undefined,
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
path,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function isSameDomain(
|
||||
url1: string | undefined,
|
||||
url2: string | undefined
|
||||
) {
|
||||
if (!url1 || !url2) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return new URL(url1).hostname === new URL(url2).hostname;
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@
|
||||
"bullmq": "^5.1.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@openpanel/sdk": "workspace:*",
|
||||
"@openpanel/eslint-config": "workspace:*",
|
||||
"@openpanel/prettier-config": "workspace:*",
|
||||
"@openpanel/tsconfig": "workspace:*",
|
||||
|
||||
@@ -1,9 +1,29 @@
|
||||
import { Queue } from 'bullmq';
|
||||
|
||||
import type { IServiceCreateEventPayload } from '@openpanel/db';
|
||||
import type { PostEventPayload } from '@openpanel/sdk';
|
||||
|
||||
import { connection } from './connection';
|
||||
|
||||
export interface EventsQueuePayloadIncomingEvent {
|
||||
type: 'incomingEvent';
|
||||
payload: {
|
||||
projectId: string;
|
||||
event: PostEventPayload;
|
||||
geo: {
|
||||
country: string | undefined;
|
||||
city: string | undefined;
|
||||
region: string | undefined;
|
||||
continent: string | undefined;
|
||||
};
|
||||
headers: {
|
||||
origin: string | undefined;
|
||||
ua: string | undefined;
|
||||
};
|
||||
currentDeviceId: string;
|
||||
previousDeviceId: string;
|
||||
};
|
||||
}
|
||||
export interface EventsQueuePayloadCreateEvent {
|
||||
type: 'createEvent';
|
||||
payload: Omit<IServiceCreateEventPayload, 'id'>;
|
||||
@@ -14,7 +34,8 @@ export interface EventsQueuePayloadCreateSessionEnd {
|
||||
}
|
||||
export type EventsQueuePayload =
|
||||
| EventsQueuePayloadCreateEvent
|
||||
| EventsQueuePayloadCreateSessionEnd;
|
||||
| EventsQueuePayloadCreateSessionEnd
|
||||
| EventsQueuePayloadIncomingEvent;
|
||||
|
||||
export interface CronQueuePayload {
|
||||
type: 'salt';
|
||||
|
||||
21
pnpm-lock.yaml
generated
21
pnpm-lock.yaml
generated
@@ -672,6 +672,15 @@ importers:
|
||||
ramda:
|
||||
specifier: ^0.29.1
|
||||
version: 0.29.1
|
||||
sqlstring:
|
||||
specifier: ^2.3.3
|
||||
version: 2.3.3
|
||||
ua-parser-js:
|
||||
specifier: ^1.0.37
|
||||
version: 1.0.37
|
||||
uuid:
|
||||
specifier: ^9.0.1
|
||||
version: 9.0.1
|
||||
devDependencies:
|
||||
'@openpanel/eslint-config':
|
||||
specifier: workspace:*
|
||||
@@ -688,6 +697,15 @@ importers:
|
||||
'@types/ramda':
|
||||
specifier: ^0.29.6
|
||||
version: 0.29.10
|
||||
'@types/sqlstring':
|
||||
specifier: ^2.3.2
|
||||
version: 2.3.2
|
||||
'@types/ua-parser-js':
|
||||
specifier: ^0.7.39
|
||||
version: 0.7.39
|
||||
'@types/uuid':
|
||||
specifier: ^9.0.8
|
||||
version: 9.0.8
|
||||
eslint:
|
||||
specifier: ^8.48.0
|
||||
version: 8.56.0
|
||||
@@ -847,6 +865,9 @@ importers:
|
||||
'@openpanel/prettier-config':
|
||||
specifier: workspace:*
|
||||
version: link:../../tooling/prettier
|
||||
'@openpanel/sdk':
|
||||
specifier: workspace:*
|
||||
version: link:../sdks/sdk
|
||||
'@openpanel/tsconfig':
|
||||
specifier: workspace:*
|
||||
version: link:../../tooling/typescript
|
||||
|
||||
Reference in New Issue
Block a user