diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 1b82e418..be711193 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -7,7 +7,7 @@ import { eventsQueue } from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; -import { getStringHeaders } from './track.controller'; +import { getStringHeaders, getTimestamp } from './track.controller'; export async function postEvent( request: FastifyRequest<{ @@ -15,6 +15,7 @@ export async function postEvent( }>, reply: FastifyReply, ) { + const timestamp = getTimestamp(request.timestamp, request.body); const ip = getClientIp(request)!; const ua = request.headers['user-agent']!; const projectId = request.client?.projectId; @@ -57,10 +58,8 @@ export async function postEvent( headers: getStringHeaders(request.headers), event: { ...request.body, - // Dont rely on the client for the timestamp - timestamp: request.timestamp - ? new Date(request.timestamp).toISOString() - : new Date().toISOString(), + timestamp: timestamp.timestamp, + isTimestampFromThePast: timestamp.isTimestampFromThePast, }, geo, currentDeviceId, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index b98e5711..9361c86d 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -57,12 +57,42 @@ function getIdentity(body: TrackHandlerPayload): IdentifyPayload | undefined { ); } +export function getTimestamp( + timestamp: FastifyRequest['timestamp'], + payload: TrackHandlerPayload['payload'], +) { + const safeTimestamp = new Date(timestamp || Date.now()).toISOString(); + const userDefinedTimestamp = path( + ['properties', '__timestamp'], + payload, + ); + + if (!userDefinedTimestamp) { + return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + } + + const clientTimestamp = new Date(userDefinedTimestamp); + + if ( + Number.isNaN(clientTimestamp.getTime()) || + clientTimestamp > new Date(safeTimestamp) + ) { + return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + } + + return { + timestamp: clientTimestamp.toISOString(), + isTimestampFromThePast: true, + }; +} + export async function handler( request: FastifyRequest<{ Body: TrackHandlerPayload; }>, reply: FastifyReply, ) { + const timestamp = getTimestamp(request.timestamp, request.body.payload); const ip = path(['properties', '__ip'], request.body.payload) || getClientIp(request)!; @@ -116,9 +146,8 @@ export async function handler( projectId, geo, headers: getStringHeaders(request.headers), - timestamp: request.timestamp - ? new Date(request.timestamp).toISOString() - : new Date().toISOString(), + timestamp: timestamp.timestamp, + isTimestampFromThePast: timestamp.isTimestampFromThePast, }), ]; @@ -185,6 +214,7 @@ async function track({ geo, headers, timestamp, + isTimestampFromThePast, }: { payload: TrackPayload; currentDeviceId: string; @@ -193,6 +223,7 @@ async function track({ geo: GeoLocation; headers: Record; timestamp: string; + isTimestampFromThePast: boolean; }) { const isScreenView = payload.name === 'screen_view'; // this will ensure that we don't have multiple events creating sessions @@ -213,8 +244,8 @@ async function track({ headers, event: { ...payload, - // Dont rely on the client for the timestamp timestamp, + isTimestampFromThePast, }, geo, currentDeviceId, diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 23c456e1..32de3867 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,30 +1,28 @@ import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer'; import type { Job } from 'bullmq'; import { omit } from 'ramda'; -import { v4 as uuid } from 'uuid'; -import { logger } from '@/utils/logger'; -import { getTime, isSameDomain, parsePath } from '@openpanel/common'; +import { createSessionEnd, getSessionEnd } from '@/utils/session-handler'; +import { isSameDomain, parsePath } from '@openpanel/common'; import { parseUserAgent } from '@openpanel/common/server'; import type { IServiceCreateEventPayload } from '@openpanel/db'; import { checkNotificationRulesForEvent, createEvent } from '@openpanel/db'; -import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; -import type { - EventsQueuePayloadCreateSessionEnd, - EventsQueuePayloadIncomingEvent, -} from '@openpanel/queue'; -import { - findJobByPrefix, - sessionsQueue, - sessionsQueueEvents, -} from '@openpanel/queue'; -import { getRedisQueue } from '@openpanel/redis'; +import { getLastScreenViewFromProfileId } from '@openpanel/db'; +import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import * as R from 'ramda'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; -export const SESSION_TIMEOUT = 1000 * 60 * 30; -const getSessionEndJobId = (projectId: string, deviceId: string) => - `sessionEnd:${projectId}:${deviceId}`; +// This function will merge two objects. +// First it will strip '' and undefined/null from B +// Then it will merge the two objects with a standard ramda merge function +const merge = (a: Partial, b: Partial): A & B => + R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B; + +async function createEventAndNotify(payload: IServiceCreateEventPayload) { + await checkNotificationRulesForEvent(payload); + return createEvent(payload); +} export async function incomingEvent(job: Job) { const { @@ -51,6 +49,7 @@ export async function incomingEvent(job: Job) { // this will get the profileId from the alias table if it exists const profileId = body.profileId ? String(body.profileId) : ''; const createdAt = new Date(body.timestamp); + const isTimestampFromThePast = body.isTimestampFromThePast; const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) @@ -62,7 +61,41 @@ export async function incomingEvent(job: Job) { const sdkVersion = headers['openpanel-sdk-version']; const uaInfo = parseUserAgent(userAgent); - if (uaInfo.isServer) { + const baseEvent = { + name: body.name, + profileId, + projectId, + properties: omit(GLOBAL_PROPERTIES, { + ...properties, + user_agent: userAgent, + __hash: hash, + __query: query, + }), + createdAt, + duration: 0, + sdkName, + sdkVersion, + city: geo.city, + country: geo.country, + region: geo.region, + longitude: geo.longitude, + latitude: geo.latitude, + path, + origin, + referrer: utmReferrer?.url || referrer?.url || '', + referrerName: utmReferrer?.name || referrer?.name || '', + referrerType: utmReferrer?.type || referrer?.type || '', + os: uaInfo.os, + osVersion: uaInfo.osVersion, + browser: uaInfo.browser, + browserVersion: uaInfo.browserVersion, + device: uaInfo.device, + brand: uaInfo.brand, + model: uaInfo.model, + } as const; + + // if timestamp is from the past we dont want to create a new session + if (uaInfo.isServer || isTimestampFromThePast) { const event = profileId ? await getLastScreenViewFromProfileId({ profileId, @@ -70,235 +103,29 @@ export async function incomingEvent(job: Job) { }) : null; - const payload: IServiceCreateEventPayload = { - name: body.name, - deviceId: event?.deviceId || '', - sessionId: event?.sessionId || '', - profileId, - projectId, - properties: { - ...omit(GLOBAL_PROPERTIES, properties), - user_agent: userAgent, - }, - createdAt, - country: event?.country || geo.country || '', - city: event?.city || geo.city || '', - region: event?.region || geo.region || '', - longitude: event?.longitude || geo.longitude || null, - latitude: event?.latitude || geo.latitude || null, - os: event?.os ?? '', - osVersion: event?.osVersion ?? '', - browser: event?.browser ?? '', - browserVersion: event?.browserVersion ?? '', - device: event?.device ?? uaInfo.device ?? '', - brand: event?.brand ?? '', - model: event?.model ?? '', - duration: 0, - path: event?.path ?? '', - origin: event?.origin ?? '', - referrer: event?.referrer ?? '', - referrerName: event?.referrerName ?? '', - referrerType: event?.referrerType ?? '', - sdkName, - sdkVersion, - }; - - await checkNotificationRulesForEvent(payload); - - return createEvent(payload); + const payload = merge(omit(['properties'], event ?? {}), baseEvent); + return createEventAndNotify(payload); } - const sessionEnd = await getSessionEndWithPriority(priority)({ + const sessionEnd = await getSessionEnd({ + priority, projectId, currentDeviceId, previousDeviceId, + profileId, }); - const sessionEndPayload = - sessionEnd?.job.data.payload || - ({ - sessionId: uuid(), - deviceId: currentDeviceId, - profileId, - projectId, - } satisfies EventsQueuePayloadCreateSessionEnd['payload']); + const payload: IServiceCreateEventPayload = merge(baseEvent, { + deviceId: sessionEnd.payload.deviceId, + sessionId: sessionEnd.payload.sessionId, + referrer: sessionEnd.payload?.referrer, + referrerName: sessionEnd.payload?.referrerName, + referrerType: sessionEnd.payload?.referrerType, + }); - const payload: IServiceCreateEventPayload = { - name: body.name, - deviceId: sessionEndPayload.deviceId, - sessionId: sessionEndPayload.sessionId, - profileId, - projectId, - properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), { - user_agent: userAgent, - __hash: hash, - __query: query, - }), - createdAt, - country: geo.country, - city: geo.city, - region: geo.region, - longitude: geo.longitude, - latitude: geo.latitude, - os: uaInfo?.os ?? '', - osVersion: uaInfo?.osVersion ?? '', - browser: uaInfo?.browser ?? '', - browserVersion: uaInfo?.browserVersion ?? '', - device: uaInfo?.device ?? '', - brand: uaInfo?.brand ?? '', - model: uaInfo?.model ?? '', - duration: 0, - path: path, - origin: origin, - referrer: sessionEnd ? sessionEndPayload.referrer : referrer?.url || '', - referrerName: sessionEnd - ? sessionEndPayload.referrerName - : referrer?.name || utmReferrer?.name || '', - referrerType: sessionEnd - ? sessionEndPayload.referrerType - : referrer?.type || utmReferrer?.type || '', - sdkName, - sdkVersion, - }; - - if (sessionEnd) { - // If for some reason we have a session end job that is not a createSessionEnd job - if (sessionEnd.job.data.type !== 'createSessionEnd') { - throw new Error('Invalid session end job'); - } - - await sessionEnd.job.changeDelay(SESSION_TIMEOUT); - } else { - await sessionsQueue.add( - 'session', - { - type: 'createSessionEnd', - payload, - }, - { - delay: SESSION_TIMEOUT, - jobId: getSessionEndJobId(projectId, sessionEndPayload.deviceId), - }, - ); + if (sessionEnd.notFound) { + await createSessionEnd({ payload }); } - if (!sessionEnd) { - await createEvent({ - ...payload, - name: 'session_start', - createdAt: new Date(getTime(payload.createdAt) - 100), - }); - } - - await checkNotificationRulesForEvent(payload); - - return createEvent(payload); -} - -function getSessionEndWithPriority( - priority: boolean, - count = 0, -): typeof getSessionEnd { - return async (args) => { - const res = await getSessionEnd(args); - - if (count > 10) { - throw new Error('Failed to get session end'); - } - - // if we get simultaneous requests we want to avoid race conditions with getting the session end - // one of the events will get priority and the other will wait for the first to finish - if (res === null && priority === false) { - await new Promise((resolve) => setTimeout(resolve, 50)); - return getSessionEndWithPriority(priority, count + 1)(args); - } - - return res; - }; -} - -async function getSessionEnd({ - projectId, - currentDeviceId, - previousDeviceId, -}: { - projectId: string; - currentDeviceId: string; - previousDeviceId: string; -}) { - async function handleJobStates( - job: Job, - ): Promise<{ deviceId: string; job: Job } | null> { - const state = await job.getState(); - if (state === 'delayed') { - return { deviceId: currentDeviceId, job }; - } - - if (state === 'completed' || state === 'failed') { - await job.remove(); - } - - if (state === 'active' || state === 'waiting') { - await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); - return getSessionEnd({ - projectId, - currentDeviceId, - previousDeviceId, - }); - } - - return null; - } - - const job = await sessionsQueue.getJob( - getSessionEndJobId(projectId, currentDeviceId), - ); - if (job) { - const res = await handleJobStates(job); - if (res) { - return res; - } - } - - const previousJob = await sessionsQueue.getJob( - getSessionEndJobId(projectId, previousDeviceId), - ); - if (previousJob) { - const res = await handleJobStates(previousJob); - if (res) { - return res; - } - } - - // Fallback during migration period - const currentSessionEndKeys = await getRedisQueue().keys( - `bull:sessions:sessionEnd:${projectId}:${currentDeviceId}:*`, - ); - - const sessionEndJobCurrentDeviceId = await findJobByPrefix( - sessionsQueue, - currentSessionEndKeys, - `sessionEnd:${projectId}:${currentDeviceId}:`, - ); - if (sessionEndJobCurrentDeviceId) { - logger.info('found session end job for current device (old)'); - return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId }; - } - - const previousSessionEndKeys = await getRedisQueue().keys( - `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}:*`, - ); - - const sessionEndJobPreviousDeviceId = await findJobByPrefix( - sessionsQueue, - previousSessionEndKeys, - `sessionEnd:${projectId}:${previousDeviceId}:`, - ); - if (sessionEndJobPreviousDeviceId) { - logger.info('found session end job for previous device (old)'); - return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId }; - } - - // Create session - return null; + return createEventAndNotify(payload); } diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts new file mode 100644 index 00000000..140d0155 --- /dev/null +++ b/apps/worker/src/utils/session-handler.ts @@ -0,0 +1,154 @@ +import { getTime } from '@openpanel/common'; +import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db'; +import { + type EventsQueuePayloadCreateSessionEnd, + sessionsQueue, + sessionsQueueEvents, +} from '@openpanel/queue'; +import type { Job } from 'bullmq'; +import { v4 as uuid } from 'uuid'; + +export const SESSION_TIMEOUT = 1000 * 60 * 30; + +const getSessionEndJobId = (projectId: string, deviceId: string) => + `sessionEnd:${projectId}:${deviceId}`; + +export async function createSessionEnd({ + payload, +}: { + payload: IServiceCreateEventPayload; +}) { + await sessionsQueue.add( + 'session', + { + type: 'createSessionEnd', + payload, + }, + { + delay: SESSION_TIMEOUT, + jobId: getSessionEndJobId(payload.projectId, payload.deviceId), + }, + ); + + await createEvent({ + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }); +} + +export async function getSessionEnd({ + projectId, + currentDeviceId, + previousDeviceId, + profileId, + priority, +}: { + projectId: string; + currentDeviceId: string; + previousDeviceId: string; + profileId: string; + priority: boolean; +}) { + const sessionEnd = await getSessionEndJob({ + priority, + projectId, + currentDeviceId, + previousDeviceId, + }); + + const sessionEndPayload = + sessionEnd?.job.data.payload || + ({ + sessionId: uuid(), + deviceId: currentDeviceId, + profileId, + projectId, + } satisfies EventsQueuePayloadCreateSessionEnd['payload']); + + if (sessionEnd) { + // If for some reason we have a session end job that is not a createSessionEnd job + if (sessionEnd.job.data.type !== 'createSessionEnd') { + throw new Error('Invalid session end job'); + } + + await sessionEnd.job.changeDelay(SESSION_TIMEOUT); + } + + return { + payload: sessionEndPayload, + notFound: !sessionEnd, + }; +} + +export async function getSessionEndJob(args: { + projectId: string; + currentDeviceId: string; + previousDeviceId: string; + priority: boolean; + retryCount?: number; +}): Promise<{ + deviceId: string; + job: Job; +} | null> { + const { priority, retryCount = 0 } = args; + + if (retryCount > 10) { + throw new Error('Failed to get session end'); + } + + async function handleJobStates( + job: Job, + deviceId: string, + ): Promise<{ + deviceId: string; + job: Job; + } | null> { + const state = await job.getState(); + if (state === 'delayed') { + return { deviceId, job }; + } + + if (state === 'completed' || state === 'failed') { + await job.remove(); + } + + if (state === 'active' || state === 'waiting') { + await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); + return getSessionEndJob({ + ...args, + priority, + retryCount, + }); + } + + return null; + } + + // Check current device job + const currentJob = await sessionsQueue.getJob( + getSessionEndJobId(args.projectId, args.currentDeviceId), + ); + if (currentJob) { + const res = await handleJobStates(currentJob, args.currentDeviceId); + if (res) return res; + } + + // Check previous device job + const previousJob = await sessionsQueue.getJob( + getSessionEndJobId(args.projectId, args.previousDeviceId), + ); + if (previousJob) { + const res = await handleJobStates(previousJob, args.previousDeviceId); + if (res) return res; + } + + // If no job found and not priority, retry + if (!priority) { + await new Promise((resolve) => setTimeout(resolve, 200)); + return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 }); + } + + // Create session + return null; +} diff --git a/packages/common/server/parser-user-agent.ts b/packages/common/server/parser-user-agent.ts index fe1f6f46..013c1c44 100644 --- a/packages/common/server/parser-user-agent.ts +++ b/packages/common/server/parser-user-agent.ts @@ -3,6 +3,12 @@ import { UAParser } from 'ua-parser-js'; const parsedServerUa = { isServer: true, device: 'server', + os: '', + osVersion: '', + browser: '', + browserVersion: '', + brand: '', + model: '', } as const; const IPHONE_MODEL_REGEX = /(iPhone|iPad)\s*([0-9,]+)/i; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 9f82d13d..682d2392 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -10,6 +10,7 @@ export interface EventsQueuePayloadIncomingEvent { projectId: string; event: TrackPayload & { timestamp: string; + isTimestampFromThePast: boolean; }; geo: { country: string | undefined; @@ -71,6 +72,11 @@ export const eventsQueue = new Queue('events', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, }, });