fix: better validation of events + clean up (#267)

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-01-07 11:58:11 +01:00
parent 6d9e3ce8e5
commit 3c085e445d
22 changed files with 387 additions and 387 deletions

View File

@@ -8,13 +8,15 @@ import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import type {
DecrementPayload,
IdentifyPayload,
IncrementPayload,
TrackHandlerPayload,
TrackPayload,
} from '@openpanel/sdk';
import {
type IDecrementPayload,
type IIdentifyPayload,
type IIncrementPayload,
type ITrackHandlerPayload,
type ITrackPayload,
zTrackHandlerPayload,
} from '@openpanel/validation';
export function getStringHeaders(headers: FastifyRequest['headers']) {
return Object.entries(
@@ -37,25 +39,28 @@ export function getStringHeaders(headers: FastifyRequest['headers']) {
);
}
function getIdentity(body: TrackHandlerPayload): IdentifyPayload | undefined {
const identity =
'properties' in body.payload
? (body.payload?.properties?.__identify as IdentifyPayload | undefined)
: undefined;
function getIdentity(body: ITrackHandlerPayload): IIdentifyPayload | undefined {
if (body.type === 'track') {
const identity = body.payload.properties?.__identify as
| IIdentifyPayload
| undefined;
return (
identity ||
(body?.payload?.profileId
? {
profileId: body.payload.profileId,
}
: undefined)
);
return (
identity ||
(body.payload.profileId
? {
profileId: body.payload.profileId,
}
: undefined)
);
}
return undefined;
}
export function getTimestamp(
timestamp: FastifyRequest['timestamp'],
payload: TrackHandlerPayload['payload'],
payload: ITrackHandlerPayload['payload'],
) {
const safeTimestamp = timestamp || Date.now();
const userDefinedTimestamp =
@@ -82,7 +87,7 @@ export function getTimestamp(
return { timestamp: safeTimestamp, isTimestampFromThePast: false };
}
// isTimestampFromThePast is true only if timestamp is older than 1 hour
// isTimestampFromThePast is true only if timestamp is older than 15 minutes
const isTimestampFromThePast =
clientTimestampNumber < safeTimestamp - FIFTEEN_MINUTES_MS;
@@ -92,170 +97,113 @@ export function getTimestamp(
};
}
export async function handler(
request: FastifyRequest<{
Body: TrackHandlerPayload;
}>,
reply: FastifyReply,
) {
const timestamp = getTimestamp(request.timestamp, request.body.payload);
const ip =
'properties' in request.body.payload &&
request.body.payload.properties?.__ip
? (request.body.payload.properties.__ip as string)
: request.clientIp;
const ua = request.headers['user-agent'];
const projectId = request.client?.projectId;
interface TrackContext {
projectId: string;
ip: string;
ua?: string;
headers: Record<string, string | undefined>;
timestamp: { value: number; isFromPast: boolean };
identity?: IIdentifyPayload;
currentDeviceId?: string;
previousDeviceId?: string;
geo: GeoLocation;
}
async function buildContext(
request: FastifyRequest<{
Body: ITrackHandlerPayload;
}>,
validatedBody: ITrackHandlerPayload,
): Promise<TrackContext> {
const projectId = request.client?.projectId;
if (!projectId) {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Missing projectId',
});
throw new HttpError('Missing projectId', { status: 400 });
}
const identity = getIdentity(request.body);
const timestamp = getTimestamp(request.timestamp, validatedBody.payload);
const ip =
validatedBody.type === 'track' && validatedBody.payload.properties?.__ip
? (validatedBody.payload.properties.__ip as string)
: request.clientIp;
const ua = request.headers['user-agent'];
const headers = getStringHeaders(request.headers);
const identity = getIdentity(validatedBody);
const profileId = identity?.profileId;
const overrideDeviceId = (() => {
const deviceId =
'properties' in request.body.payload
? request.body.payload.properties?.__deviceId
: undefined;
if (typeof deviceId === 'string') {
return deviceId;
}
return undefined;
})();
// We might get a profileId from the alias table
// If we do, we should use that instead of the one from the payload
if (profileId) {
request.body.payload.profileId = profileId;
if (profileId && validatedBody.type === 'track') {
validatedBody.payload.profileId = profileId;
}
switch (request.body.type) {
case 'track': {
const [salts, geo] = await Promise.all([getSalts(), getGeoLocation(ip)]);
const currentDeviceId =
overrideDeviceId ||
(ua
? generateDeviceId({
salt: salts.current,
origin: projectId,
ip,
ua,
})
: '');
const previousDeviceId = ua
// Get geo location (needed for track and identify)
const geo = await getGeoLocation(ip);
// Generate device IDs if needed (for track)
let currentDeviceId: string | undefined;
let previousDeviceId: string | undefined;
if (validatedBody.type === 'track') {
const overrideDeviceId =
typeof validatedBody.payload.properties?.__deviceId === 'string'
? validatedBody.payload.properties.__deviceId
: undefined;
const [salts] = await Promise.all([getSalts()]);
currentDeviceId =
overrideDeviceId ||
(ua
? generateDeviceId({
salt: salts.previous,
salt: salts.current,
origin: projectId,
ip,
ua,
})
: '';
const promises = [];
// If we have more than one property in the identity object, we should identify the user
// Otherwise its only a profileId and we should not identify the user
if (identity && Object.keys(identity).length > 1) {
promises.push(
identify({
payload: identity,
projectId,
geo,
ua,
}),
);
}
promises.push(
track({
payload: request.body.payload,
currentDeviceId,
previousDeviceId,
projectId,
geo,
headers: getStringHeaders(request.headers),
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
}),
);
await Promise.all(promises);
break;
}
case 'identify': {
const payload = request.body.payload;
const geo = await getGeoLocation(ip);
if (!payload.profileId) {
throw new HttpError('Missing profileId', {
status: 400,
});
}
await identify({
payload,
projectId,
geo,
ua,
});
break;
}
case 'alias': {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Alias is not supported',
});
}
case 'increment': {
await increment({
payload: request.body.payload,
projectId,
});
break;
}
case 'decrement': {
await decrement({
payload: request.body.payload,
projectId,
});
break;
}
default: {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Invalid type',
});
}
: '');
previousDeviceId = ua
? generateDeviceId({
salt: salts.previous,
origin: projectId,
ip,
ua,
})
: '';
}
reply.status(200).send();
return {
projectId,
ip,
ua,
headers,
timestamp: {
value: timestamp.timestamp,
isFromPast: timestamp.isTimestampFromThePast,
},
identity,
currentDeviceId,
previousDeviceId,
geo,
};
}
async function track({
payload,
currentDeviceId,
previousDeviceId,
projectId,
geo,
headers,
timestamp,
isTimestampFromThePast,
}: {
payload: TrackPayload;
currentDeviceId: string;
previousDeviceId: string;
projectId: string;
geo: GeoLocation;
headers: Record<string, string | undefined>;
timestamp: number;
isTimestampFromThePast: boolean;
}) {
async function handleTrack(
payload: ITrackPayload,
context: TrackContext,
): Promise<void> {
const {
projectId,
currentDeviceId,
previousDeviceId,
geo,
headers,
timestamp,
} = context;
if (!currentDeviceId || !previousDeviceId) {
throw new HttpError('Device ID generation failed', { status: 500 });
}
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
const groupId = uaInfo.isServer
? payload.profileId
@@ -264,44 +212,51 @@ async function track({
: currentDeviceId;
const jobId = [
slug(payload.name),
timestamp,
timestamp.value,
projectId,
currentDeviceId,
groupId,
]
.filter(Boolean)
.join('-');
await getEventsGroupQueueShard(groupId).add({
orderMs: timestamp,
data: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
const promises = [];
// If we have more than one property in the identity object, we should identify the user
// Otherwise its only a profileId and we should not identify the user
if (context.identity && Object.keys(context.identity).length > 1) {
promises.push(handleIdentify(context.identity, context));
}
promises.push(
getEventsGroupQueueShard(groupId).add({
orderMs: timestamp.value,
data: {
projectId,
headers,
event: {
...payload,
timestamp: timestamp.value,
isTimestampFromThePast: timestamp.isFromPast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
},
groupId,
jobId,
});
groupId,
jobId,
}),
);
await Promise.all(promises);
}
async function identify({
payload,
projectId,
geo,
ua,
}: {
payload: IdentifyPayload;
projectId: string;
geo: GeoLocation;
ua?: string;
}) {
async function handleIdentify(
payload: IIdentifyPayload,
context: TrackContext,
): Promise<void> {
const { projectId, geo, ua } = context;
const uaInfo = parseUserAgent(ua, payload.properties);
await upsertProfile({
...payload,
@@ -326,17 +281,15 @@ async function identify({
});
}
async function increment({
payload,
projectId,
}: {
payload: IncrementPayload;
projectId: string;
}) {
async function adjustProfileProperty(
payload: IIncrementPayload | IDecrementPayload,
projectId: string,
direction: 1 | -1,
): Promise<void> {
const { profileId, property, value } = payload;
const profile = await getProfileById(profileId, projectId);
if (!profile) {
throw new Error('Not found');
throw new HttpError('Profile not found', { status: 404 });
}
const parsed = Number.parseInt(
@@ -345,12 +298,12 @@ async function increment({
);
if (Number.isNaN(parsed)) {
throw new Error('Not number');
throw new HttpError('Property value is not a number', { status: 400 });
}
profile.properties = assocPath(
property.split('.'),
parsed + (value || 1),
parsed + direction * (value || 1),
profile.properties,
);
@@ -362,40 +315,74 @@ async function increment({
});
}
async function decrement({
payload,
projectId,
}: {
payload: DecrementPayload;
projectId: string;
}) {
const { profileId, property, value } = payload;
const profile = await getProfileById(profileId, projectId);
if (!profile) {
throw new Error('Not found');
async function handleIncrement(
payload: IIncrementPayload,
context: TrackContext,
): Promise<void> {
await adjustProfileProperty(payload, context.projectId, 1);
}
async function handleDecrement(
payload: IDecrementPayload,
context: TrackContext,
): Promise<void> {
await adjustProfileProperty(payload, context.projectId, -1);
}
export async function handler(
request: FastifyRequest<{
Body: ITrackHandlerPayload;
}>,
reply: FastifyReply,
) {
// Validate request body with Zod
const validationResult = zTrackHandlerPayload.safeParse(request.body);
if (!validationResult.success) {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Validation failed',
errors: validationResult.error.errors,
});
}
const parsed = Number.parseInt(
pathOr<string>('0', property.split('.'), profile.properties),
10,
);
const validatedBody = validationResult.data;
if (Number.isNaN(parsed)) {
throw new Error('Not number');
// Handle alias (not supported)
if (validatedBody.type === 'alias') {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Alias is not supported',
});
}
profile.properties = assocPath(
property.split('.'),
parsed - (value || 1),
profile.properties,
);
// Build request context
const context = await buildContext(request, validatedBody);
await upsertProfile({
id: profile.id,
projectId,
properties: profile.properties,
isExternal: true,
});
// Dispatch to appropriate handler
switch (validatedBody.type) {
case 'track':
await handleTrack(validatedBody.payload, context);
break;
case 'identify':
await handleIdentify(validatedBody.payload, context);
break;
case 'increment':
await handleIncrement(validatedBody.payload, context);
break;
case 'decrement':
await handleDecrement(validatedBody.payload, context);
break;
default:
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Invalid type',
});
}
reply.status(200).send();
}
export async function fetchDeviceId(