feature(api,worker): Override default timestamp with a date from the past (#76)

* feature(worker,api): refactor incoming events and support custom timestamps from the past

* fix(queue): add retry logic to events queue

* fix(worker): remove properties when merging server events
This commit is contained in:
Carl-Gerhard Lindesvärd
2024-10-22 10:25:21 +02:00
committed by GitHub
parent c4a2ea4858
commit 4fe338c628
6 changed files with 271 additions and 248 deletions

View File

@@ -7,7 +7,7 @@ import { eventsQueue } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import type { PostEventPayload } from '@openpanel/sdk';
import { getStringHeaders } from './track.controller';
import { getStringHeaders, getTimestamp } from './track.controller';
export async function postEvent(
request: FastifyRequest<{
@@ -15,6 +15,7 @@ export async function postEvent(
}>,
reply: FastifyReply,
) {
const timestamp = getTimestamp(request.timestamp, request.body);
const ip = getClientIp(request)!;
const ua = request.headers['user-agent']!;
const projectId = request.client?.projectId;
@@ -57,10 +58,8 @@ export async function postEvent(
headers: getStringHeaders(request.headers),
event: {
...request.body,
// Dont rely on the client for the timestamp
timestamp: request.timestamp
? new Date(request.timestamp).toISOString()
: new Date().toISOString(),
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
},
geo,
currentDeviceId,

View File

@@ -57,12 +57,42 @@ function getIdentity(body: TrackHandlerPayload): IdentifyPayload | undefined {
);
}
export function getTimestamp(
timestamp: FastifyRequest['timestamp'],
payload: TrackHandlerPayload['payload'],
) {
const safeTimestamp = new Date(timestamp || Date.now()).toISOString();
const userDefinedTimestamp = path<string>(
['properties', '__timestamp'],
payload,
);
if (!userDefinedTimestamp) {
return { timestamp: safeTimestamp, isTimestampFromThePast: false };
}
const clientTimestamp = new Date(userDefinedTimestamp);
if (
Number.isNaN(clientTimestamp.getTime()) ||
clientTimestamp > new Date(safeTimestamp)
) {
return { timestamp: safeTimestamp, isTimestampFromThePast: false };
}
return {
timestamp: clientTimestamp.toISOString(),
isTimestampFromThePast: true,
};
}
export async function handler(
request: FastifyRequest<{
Body: TrackHandlerPayload;
}>,
reply: FastifyReply,
) {
const timestamp = getTimestamp(request.timestamp, request.body.payload);
const ip =
path<string>(['properties', '__ip'], request.body.payload) ||
getClientIp(request)!;
@@ -116,9 +146,8 @@ export async function handler(
projectId,
geo,
headers: getStringHeaders(request.headers),
timestamp: request.timestamp
? new Date(request.timestamp).toISOString()
: new Date().toISOString(),
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
}),
];
@@ -185,6 +214,7 @@ async function track({
geo,
headers,
timestamp,
isTimestampFromThePast,
}: {
payload: TrackPayload;
currentDeviceId: string;
@@ -193,6 +223,7 @@ async function track({
geo: GeoLocation;
headers: Record<string, string | undefined>;
timestamp: string;
isTimestampFromThePast: boolean;
}) {
const isScreenView = payload.name === 'screen_view';
// this will ensure that we don't have multiple events creating sessions
@@ -213,8 +244,8 @@ async function track({
headers,
event: {
...payload,
// Dont rely on the client for the timestamp
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,

View File

@@ -1,30 +1,28 @@
import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
import type { Job } from 'bullmq';
import { omit } from 'ramda';
import { v4 as uuid } from 'uuid';
import { logger } from '@/utils/logger';
import { getTime, isSameDomain, parsePath } from '@openpanel/common';
import { createSessionEnd, getSessionEnd } from '@/utils/session-handler';
import { isSameDomain, parsePath } from '@openpanel/common';
import { parseUserAgent } from '@openpanel/common/server';
import type { IServiceCreateEventPayload } from '@openpanel/db';
import { checkNotificationRulesForEvent, createEvent } from '@openpanel/db';
import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service';
import type {
EventsQueuePayloadCreateSessionEnd,
EventsQueuePayloadIncomingEvent,
} from '@openpanel/queue';
import {
findJobByPrefix,
sessionsQueue,
sessionsQueueEvents,
} from '@openpanel/queue';
import { getRedisQueue } from '@openpanel/redis';
import { getLastScreenViewFromProfileId } from '@openpanel/db';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import * as R from 'ramda';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
export const SESSION_TIMEOUT = 1000 * 60 * 30;
const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`;
// This function will merge two objects.
// First it will strip '' and undefined/null from B
// Then it will merge the two objects with a standard ramda merge function
const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B;
async function createEventAndNotify(payload: IServiceCreateEventPayload) {
await checkNotificationRulesForEvent(payload);
return createEvent(payload);
}
export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
const {
@@ -51,6 +49,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
// this will get the profileId from the alias table if it exists
const profileId = body.profileId ? String(body.profileId) : '';
const createdAt = new Date(body.timestamp);
const isTimestampFromThePast = body.isTimestampFromThePast;
const url = getProperty('__path');
const { path, hash, query, origin } = parsePath(url);
const referrer = isSameDomain(getProperty('__referrer'), url)
@@ -62,7 +61,41 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
const sdkVersion = headers['openpanel-sdk-version'];
const uaInfo = parseUserAgent(userAgent);
if (uaInfo.isServer) {
const baseEvent = {
name: body.name,
profileId,
projectId,
properties: omit(GLOBAL_PROPERTIES, {
...properties,
user_agent: userAgent,
__hash: hash,
__query: query,
}),
createdAt,
duration: 0,
sdkName,
sdkVersion,
city: geo.city,
country: geo.country,
region: geo.region,
longitude: geo.longitude,
latitude: geo.latitude,
path,
origin,
referrer: utmReferrer?.url || referrer?.url || '',
referrerName: utmReferrer?.name || referrer?.name || '',
referrerType: utmReferrer?.type || referrer?.type || '',
os: uaInfo.os,
osVersion: uaInfo.osVersion,
browser: uaInfo.browser,
browserVersion: uaInfo.browserVersion,
device: uaInfo.device,
brand: uaInfo.brand,
model: uaInfo.model,
} as const;
// if timestamp is from the past we dont want to create a new session
if (uaInfo.isServer || isTimestampFromThePast) {
const event = profileId
? await getLastScreenViewFromProfileId({
profileId,
@@ -70,235 +103,29 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
})
: null;
const payload: IServiceCreateEventPayload = {
name: body.name,
deviceId: event?.deviceId || '',
sessionId: event?.sessionId || '',
profileId,
projectId,
properties: {
...omit(GLOBAL_PROPERTIES, properties),
user_agent: userAgent,
},
createdAt,
country: event?.country || geo.country || '',
city: event?.city || geo.city || '',
region: event?.region || geo.region || '',
longitude: event?.longitude || geo.longitude || null,
latitude: event?.latitude || geo.latitude || null,
os: event?.os ?? '',
osVersion: event?.osVersion ?? '',
browser: event?.browser ?? '',
browserVersion: event?.browserVersion ?? '',
device: event?.device ?? uaInfo.device ?? '',
brand: event?.brand ?? '',
model: event?.model ?? '',
duration: 0,
path: event?.path ?? '',
origin: event?.origin ?? '',
referrer: event?.referrer ?? '',
referrerName: event?.referrerName ?? '',
referrerType: event?.referrerType ?? '',
sdkName,
sdkVersion,
};
await checkNotificationRulesForEvent(payload);
return createEvent(payload);
const payload = merge(omit(['properties'], event ?? {}), baseEvent);
return createEventAndNotify(payload);
}
const sessionEnd = await getSessionEndWithPriority(priority)({
const sessionEnd = await getSessionEnd({
priority,
projectId,
currentDeviceId,
previousDeviceId,
profileId,
});
const sessionEndPayload =
sessionEnd?.job.data.payload ||
({
sessionId: uuid(),
deviceId: currentDeviceId,
profileId,
projectId,
} satisfies EventsQueuePayloadCreateSessionEnd['payload']);
const payload: IServiceCreateEventPayload = merge(baseEvent, {
deviceId: sessionEnd.payload.deviceId,
sessionId: sessionEnd.payload.sessionId,
referrer: sessionEnd.payload?.referrer,
referrerName: sessionEnd.payload?.referrerName,
referrerType: sessionEnd.payload?.referrerType,
});
const payload: IServiceCreateEventPayload = {
name: body.name,
deviceId: sessionEndPayload.deviceId,
sessionId: sessionEndPayload.sessionId,
profileId,
projectId,
properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), {
user_agent: userAgent,
__hash: hash,
__query: query,
}),
createdAt,
country: geo.country,
city: geo.city,
region: geo.region,
longitude: geo.longitude,
latitude: geo.latitude,
os: uaInfo?.os ?? '',
osVersion: uaInfo?.osVersion ?? '',
browser: uaInfo?.browser ?? '',
browserVersion: uaInfo?.browserVersion ?? '',
device: uaInfo?.device ?? '',
brand: uaInfo?.brand ?? '',
model: uaInfo?.model ?? '',
duration: 0,
path: path,
origin: origin,
referrer: sessionEnd ? sessionEndPayload.referrer : referrer?.url || '',
referrerName: sessionEnd
? sessionEndPayload.referrerName
: referrer?.name || utmReferrer?.name || '',
referrerType: sessionEnd
? sessionEndPayload.referrerType
: referrer?.type || utmReferrer?.type || '',
sdkName,
sdkVersion,
};
if (sessionEnd) {
// If for some reason we have a session end job that is not a createSessionEnd job
if (sessionEnd.job.data.type !== 'createSessionEnd') {
throw new Error('Invalid session end job');
}
await sessionEnd.job.changeDelay(SESSION_TIMEOUT);
} else {
await sessionsQueue.add(
'session',
{
type: 'createSessionEnd',
payload,
},
{
delay: SESSION_TIMEOUT,
jobId: getSessionEndJobId(projectId, sessionEndPayload.deviceId),
},
);
if (sessionEnd.notFound) {
await createSessionEnd({ payload });
}
if (!sessionEnd) {
await createEvent({
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
});
}
await checkNotificationRulesForEvent(payload);
return createEvent(payload);
}
function getSessionEndWithPriority(
priority: boolean,
count = 0,
): typeof getSessionEnd {
return async (args) => {
const res = await getSessionEnd(args);
if (count > 10) {
throw new Error('Failed to get session end');
}
// if we get simultaneous requests we want to avoid race conditions with getting the session end
// one of the events will get priority and the other will wait for the first to finish
if (res === null && priority === false) {
await new Promise((resolve) => setTimeout(resolve, 50));
return getSessionEndWithPriority(priority, count + 1)(args);
}
return res;
};
}
async function getSessionEnd({
projectId,
currentDeviceId,
previousDeviceId,
}: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
}) {
async function handleJobStates(
job: Job,
): Promise<{ deviceId: string; job: Job } | null> {
const state = await job.getState();
if (state === 'delayed') {
return { deviceId: currentDeviceId, job };
}
if (state === 'completed' || state === 'failed') {
await job.remove();
}
if (state === 'active' || state === 'waiting') {
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
return getSessionEnd({
projectId,
currentDeviceId,
previousDeviceId,
});
}
return null;
}
const job = await sessionsQueue.getJob(
getSessionEndJobId(projectId, currentDeviceId),
);
if (job) {
const res = await handleJobStates(job);
if (res) {
return res;
}
}
const previousJob = await sessionsQueue.getJob(
getSessionEndJobId(projectId, previousDeviceId),
);
if (previousJob) {
const res = await handleJobStates(previousJob);
if (res) {
return res;
}
}
// Fallback during migration period
const currentSessionEndKeys = await getRedisQueue().keys(
`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}:*`,
);
const sessionEndJobCurrentDeviceId = await findJobByPrefix(
sessionsQueue,
currentSessionEndKeys,
`sessionEnd:${projectId}:${currentDeviceId}:`,
);
if (sessionEndJobCurrentDeviceId) {
logger.info('found session end job for current device (old)');
return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId };
}
const previousSessionEndKeys = await getRedisQueue().keys(
`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}:*`,
);
const sessionEndJobPreviousDeviceId = await findJobByPrefix(
sessionsQueue,
previousSessionEndKeys,
`sessionEnd:${projectId}:${previousDeviceId}:`,
);
if (sessionEndJobPreviousDeviceId) {
logger.info('found session end job for previous device (old)');
return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId };
}
// Create session
return null;
return createEventAndNotify(payload);
}

View File

@@ -0,0 +1,154 @@
import { getTime } from '@openpanel/common';
import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db';
import {
type EventsQueuePayloadCreateSessionEnd,
sessionsQueue,
sessionsQueueEvents,
} from '@openpanel/queue';
import type { Job } from 'bullmq';
import { v4 as uuid } from 'uuid';
export const SESSION_TIMEOUT = 1000 * 60 * 30;
const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`;
export async function createSessionEnd({
payload,
}: {
payload: IServiceCreateEventPayload;
}) {
await sessionsQueue.add(
'session',
{
type: 'createSessionEnd',
payload,
},
{
delay: SESSION_TIMEOUT,
jobId: getSessionEndJobId(payload.projectId, payload.deviceId),
},
);
await createEvent({
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
});
}
export async function getSessionEnd({
projectId,
currentDeviceId,
previousDeviceId,
profileId,
priority,
}: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
profileId: string;
priority: boolean;
}) {
const sessionEnd = await getSessionEndJob({
priority,
projectId,
currentDeviceId,
previousDeviceId,
});
const sessionEndPayload =
sessionEnd?.job.data.payload ||
({
sessionId: uuid(),
deviceId: currentDeviceId,
profileId,
projectId,
} satisfies EventsQueuePayloadCreateSessionEnd['payload']);
if (sessionEnd) {
// If for some reason we have a session end job that is not a createSessionEnd job
if (sessionEnd.job.data.type !== 'createSessionEnd') {
throw new Error('Invalid session end job');
}
await sessionEnd.job.changeDelay(SESSION_TIMEOUT);
}
return {
payload: sessionEndPayload,
notFound: !sessionEnd,
};
}
export async function getSessionEndJob(args: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
priority: boolean;
retryCount?: number;
}): Promise<{
deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const { priority, retryCount = 0 } = args;
if (retryCount > 10) {
throw new Error('Failed to get session end');
}
async function handleJobStates(
job: Job<EventsQueuePayloadCreateSessionEnd>,
deviceId: string,
): Promise<{
deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const state = await job.getState();
if (state === 'delayed') {
return { deviceId, job };
}
if (state === 'completed' || state === 'failed') {
await job.remove();
}
if (state === 'active' || state === 'waiting') {
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
return null;
}
// Check current device job
const currentJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.currentDeviceId),
);
if (currentJob) {
const res = await handleJobStates(currentJob, args.currentDeviceId);
if (res) return res;
}
// Check previous device job
const previousJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.previousDeviceId),
);
if (previousJob) {
const res = await handleJobStates(previousJob, args.previousDeviceId);
if (res) return res;
}
// If no job found and not priority, retry
if (!priority) {
await new Promise((resolve) => setTimeout(resolve, 200));
return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 });
}
// Create session
return null;
}

View File

@@ -3,6 +3,12 @@ import { UAParser } from 'ua-parser-js';
const parsedServerUa = {
isServer: true,
device: 'server',
os: '',
osVersion: '',
browser: '',
browserVersion: '',
brand: '',
model: '',
} as const;
const IPHONE_MODEL_REGEX = /(iPhone|iPad)\s*([0-9,]+)/i;

View File

@@ -10,6 +10,7 @@ export interface EventsQueuePayloadIncomingEvent {
projectId: string;
event: TrackPayload & {
timestamp: string;
isTimestampFromThePast: boolean;
};
geo: {
country: string | undefined;
@@ -71,6 +72,11 @@ export const eventsQueue = new Queue<EventsQueuePayload>('events', {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});