improve(queue): how we handle incoming events and session ends

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-06-03 21:13:15 +02:00
parent 39775142e2
commit 0d58a5bf0c
13 changed files with 245 additions and 266 deletions

View File

@@ -1,5 +1,4 @@
import type { Job } from 'bullmq';
import { last } from 'ramda';
import { logger as baseLogger } from '@/utils/logger';
import { getTime } from '@openpanel/common';
@@ -9,61 +8,56 @@ import {
checkNotificationRulesForSessionEnd,
createEvent,
eventBuffer,
formatClickhouseDate,
getEvents,
} from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue';
async function getCompleteSession({
// Grabs session_start and screen_views + the last occured event
async function getNecessarySessionEvents({
projectId,
sessionId,
hoursInterval,
createdAt,
}: {
projectId: string;
sessionId: string;
hoursInterval: number;
}) {
createdAt: Date;
}): Promise<ReturnType<typeof getEvents>> {
const sql = `
SELECT * FROM ${TABLE_NAMES.events}
WHERE
session_id = '${sessionId}'
AND project_id = '${projectId}'
AND created_at > now() - interval ${hoursInterval} HOUR
ORDER BY created_at DESC
AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}'
AND (
name IN ('screen_view', 'session_start')
OR created_at = (
SELECT MAX(created_at)
FROM ${TABLE_NAMES.events}
WHERE session_id = '${sessionId}'
AND project_id = '${projectId}'
AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}'
AND name NOT IN ('screen_view', 'session_start')
)
)
ORDER BY created_at DESC;
`;
return getEvents(sql);
}
async function getCompleteSessionWithSessionStart({
projectId,
sessionId,
logger,
}: {
projectId: string;
sessionId: string;
logger: ILogger;
}): Promise<ReturnType<typeof getEvents>> {
const intervals = [1, 6, 12, 24, 72];
let intervalIndex = 0;
for (const hoursInterval of intervals) {
const events = await getCompleteSession({
const [lastScreenView, eventsInDb] = await Promise.all([
eventBuffer.getLastScreenView({
projectId,
sessionId,
hoursInterval,
});
}),
getEvents(sql),
]);
if (events.find((event) => event.name === 'session_start')) {
return events;
}
const nextHoursInterval = intervals[++intervalIndex];
if (nextHoursInterval) {
logger.warn(`Checking last ${nextHoursInterval} hours for session_start`);
}
}
return [];
// sort last inserted first
return [lastScreenView, ...eventsInDb]
.filter((event): event is IServiceEvent => !!event)
.sort(
(a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
}
export async function createSessionEnd(
@@ -77,56 +71,27 @@ export async function createSessionEnd(
const payload = job.data.payload;
const [lastScreenView, eventsInDb] = await Promise.all([
eventBuffer.getLastScreenView({
projectId: payload.projectId,
sessionId: payload.sessionId,
}),
getCompleteSessionWithSessionStart({
projectId: payload.projectId,
sessionId: payload.sessionId,
logger,
}),
]);
const events = await getNecessarySessionEvents({
projectId: payload.projectId,
sessionId: payload.sessionId,
createdAt: payload.createdAt,
});
// sort last inserted first
const events = [lastScreenView, ...eventsInDb]
.filter((event): event is IServiceEvent => !!event)
.sort(
(a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
const sessionDuration = events.reduce((acc, event) => {
return acc + event.duration;
}, 0);
let sessionStart = events.find((event) => event.name === 'session_start');
const lastEvent = events[0];
const sessionStart = events.find((event) => event.name === 'session_start');
const screenViews = events.filter((event) => event.name === 'screen_view');
const lastEvent = events[0];
if (!sessionStart) {
const firstScreenView = last(screenViews);
if (!firstScreenView) {
throw new Error('Could not found session_start or any screen_view');
}
logger.warn('Creating session_start since it was not found');
sessionStart = {
...firstScreenView,
name: 'session_start',
createdAt: new Date(getTime(firstScreenView.createdAt) - 100),
};
await createEvent(sessionStart);
throw new Error('No session_start found');
}
if (!lastEvent) {
throw new Error('No last event found');
}
const sessionDuration =
lastEvent.createdAt.getTime() - sessionStart.createdAt.getTime();
await checkNotificationRulesForSessionEnd(events);
logger.info('Creating session_end', {
@@ -135,7 +100,6 @@ export async function createSessionEnd(
screenViews,
sessionDuration,
events,
lastScreenView: lastScreenView ? lastScreenView : 'none',
});
return createEvent({

View File

@@ -1,9 +1,10 @@
import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
import type { Job } from 'bullmq';
import { omit } from 'ramda';
import { logger as baseLogger } from '@/utils/logger';
import { createSessionEnd, getSessionEnd } from '@/utils/session-handler';
import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
import {
createSessionEndJob,
createSessionStart,
getSessionEnd,
} from '@/utils/session-handler';
import { isSameDomain, parsePath } from '@openpanel/common';
import { parseUserAgent } from '@openpanel/common/server';
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
@@ -14,7 +15,11 @@ import {
} from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import { getLock } from '@openpanel/redis';
import { DelayedError, type Job } from 'bullmq';
import { omit } from 'ramda';
import * as R from 'ramda';
import { v4 as uuid } from 'uuid';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
@@ -29,16 +34,18 @@ async function createEventAndNotify(
jobData: Job<EventsQueuePayloadIncomingEvent>['data']['payload'],
logger: ILogger,
) {
await checkNotificationRulesForEvent(payload).catch((e) => {
logger.error('Error checking notification rules', { error: e });
});
logger.info('Creating event', { event: payload, jobData });
return createEvent(payload);
const [event] = await Promise.all([
createEvent(payload),
checkNotificationRulesForEvent(payload),
]);
return event;
}
export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
export async function incomingEvent(
job: Job<EventsQueuePayloadIncomingEvent>,
token?: string,
) {
const {
geo,
event: body,
@@ -46,7 +53,6 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
projectId,
currentDeviceId,
previousDeviceId,
priority,
} = job.data.payload;
const properties = body.properties ?? {};
const reqId = headers['request-id'] ?? 'unknown';
@@ -131,32 +137,50 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
}
const sessionEnd = await getSessionEnd({
priority,
projectId,
currentDeviceId,
previousDeviceId,
profileId,
});
const lastScreenView = await eventBuffer.getLastScreenView({
projectId,
sessionId: sessionEnd.payload.sessionId,
});
const lastScreenView = sessionEnd
? await eventBuffer.getLastScreenView({
projectId,
sessionId: sessionEnd.sessionId,
})
: null;
const payload: IServiceCreateEventPayload = merge(baseEvent, {
deviceId: sessionEnd.payload.deviceId,
sessionId: sessionEnd.payload.sessionId,
referrer: sessionEnd.payload?.referrer,
referrerName: sessionEnd.payload?.referrerName,
referrerType: sessionEnd.payload?.referrerType,
deviceId: sessionEnd?.deviceId ?? currentDeviceId,
sessionId: sessionEnd?.sessionId ?? uuid(),
referrer: sessionEnd?.referrer ?? baseEvent.referrer,
referrerName: sessionEnd?.referrerName ?? baseEvent.referrerName,
referrerType: sessionEnd?.referrerType ?? baseEvent.referrerType,
// if the path is not set, use the last screen view path
path: baseEvent.path || lastScreenView?.path || '',
origin: baseEvent.origin || lastScreenView?.origin || '',
} as Partial<IServiceCreateEventPayload>) as IServiceCreateEventPayload;
if (sessionEnd.notFound) {
await createSessionEnd({ payload });
if (!sessionEnd) {
// Too avoid several created sessions we just throw if a lock exists
// This will than retry the job
const lock = await getLock(
`create-session-end:${currentDeviceId}`,
'locked',
1000,
);
if (!lock) {
logger.warn('Move incoming event to delayed');
await job.moveToDelayed(Date.now() + 50, token);
throw new DelayedError();
}
await createSessionStart({ payload });
}
return createEventAndNotify(payload, job.data.payload, logger);
const event = await createEventAndNotify(payload, job.data.payload, logger);
await createSessionEndJob({ payload });
return event;
}

View File

@@ -7,6 +7,9 @@ import type {
import { incomingEvent } from './events.incoming-event';
export async function eventsJob(job: Job<EventsQueuePayload>) {
return await incomingEvent(job as Job<EventsQueuePayloadIncomingEvent>);
export async function eventsJob(job: Job<EventsQueuePayload>, token?: string) {
return await incomingEvent(
job as Job<EventsQueuePayloadIncomingEvent>,
token,
);
}

View File

@@ -3,22 +3,33 @@ import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db';
import {
type EventsQueuePayloadCreateSessionEnd,
sessionsQueue,
sessionsQueueEvents,
} from '@openpanel/queue';
import type { Job } from 'bullmq';
import { v4 as uuid } from 'uuid';
import { logger } from './logger';
export const SESSION_TIMEOUT = 1000 * 60 * 30;
const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`;
export async function createSessionEnd({
export async function createSessionStart({
payload,
}: {
payload: IServiceCreateEventPayload;
}) {
await sessionsQueue.add(
return createEvent({
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
});
}
export async function createSessionEndJob({
payload,
}: {
payload: IServiceCreateEventPayload;
}) {
return sessionsQueue.add(
'session',
{
type: 'createSessionEnd',
@@ -34,12 +45,6 @@ export async function createSessionEnd({
},
},
);
await createEvent({
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
});
}
export async function getSessionEnd({
@@ -47,42 +52,33 @@ export async function getSessionEnd({
currentDeviceId,
previousDeviceId,
profileId,
priority,
}: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
profileId: string;
priority: boolean;
}) {
const sessionEnd = await getSessionEndJob({
priority,
projectId,
currentDeviceId,
previousDeviceId,
});
const sessionEndPayload =
sessionEnd?.job.data.payload ||
({
sessionId: uuid(),
deviceId: currentDeviceId,
profileId,
projectId,
} satisfies EventsQueuePayloadCreateSessionEnd['payload']);
if (sessionEnd) {
// If for some reason we have a session end job that is not a createSessionEnd job
if (sessionEnd.job.data.type !== 'createSessionEnd') {
throw new Error('Invalid session end job');
// Hack: if session end job just got created, we want to give it a chance to complete
// So the order is correct
if (sessionEnd.job.timestamp > Date.now() - 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
// If the profile_id is set and it's different from the device_id, we need to update the profile_id
if (
sessionEnd.job.data.payload.profileId !== profileId &&
const existingSessionIsAnonymous =
sessionEnd.job.data.payload.profileId ===
sessionEnd.job.data.payload.deviceId
) {
sessionEnd.job.data.payload.deviceId;
const eventIsIdentified =
sessionEnd.job.data.payload.profileId !== profileId;
if (existingSessionIsAnonymous && eventIsIdentified) {
await sessionEnd.job.updateData({
...sessionEnd.job.data,
payload: {
@@ -93,25 +89,22 @@ export async function getSessionEnd({
}
await sessionEnd.job.changeDelay(SESSION_TIMEOUT);
return sessionEnd.job.data.payload;
}
return {
payload: sessionEndPayload,
notFound: !sessionEnd,
};
return null;
}
export async function getSessionEndJob(args: {
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
priority: boolean;
retryCount?: number;
}): Promise<{
deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const { priority, retryCount = 0 } = args;
const { retryCount = 0 } = args;
if (retryCount >= 6) {
throw new Error('Failed to get session end');
@@ -125,46 +118,32 @@ export async function getSessionEndJob(args: {
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const state = await job.getState();
if (state === 'delayed') {
if (state !== 'delayed') {
logger.info(`[session-handler] Session end job is in "${state}" state`, {
state,
retryCount,
jobTimestamp: new Date(job.timestamp).toISOString(),
jobDelta: Date.now() - job.timestamp,
jobId: job.id,
reqId: job.data.payload.properties?.__reqId ?? 'unknown',
payload: job.data.payload,
});
}
if (state === 'delayed' || state === 'waiting') {
return { deviceId, job };
}
if (state === 'failed') {
await job.retry();
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
if (state === 'active') {
await new Promise((resolve) => setTimeout(resolve, 100));
return getSessionEndJob({
...args,
priority,
retryCount,
retryCount: retryCount + 1,
});
}
if (state === 'completed') {
await job.remove();
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
if (state === 'active' || state === 'waiting') {
await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10);
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
// Shady state here, just remove it and retry
if (state === 'unknown') {
await job.remove();
return getSessionEndJob({
...args,
priority,
retryCount,
});
}
return null;
@@ -175,8 +154,7 @@ export async function getSessionEndJob(args: {
getSessionEndJobId(args.projectId, args.currentDeviceId),
);
if (currentJob) {
const res = await handleJobStates(currentJob, args.currentDeviceId);
if (res) return res;
return await handleJobStates(currentJob, args.currentDeviceId);
}
// Check previous device job
@@ -184,15 +162,7 @@ export async function getSessionEndJob(args: {
getSessionEndJobId(args.projectId, args.previousDeviceId),
);
if (previousJob) {
const res = await handleJobStates(previousJob, args.previousDeviceId);
if (res) return res;
}
// If no job found and not priority, retry
if (!priority) {
const backoffDelay = 50 * 2 ** retryCount;
await new Promise((resolve) => setTimeout(resolve, backoffDelay));
return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 });
return await handleJobStates(previousJob, args.previousDeviceId);
}
// Create session