fix: remove old event queue, cleaned up session handling, remove hacks

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-10-09 09:25:52 +02:00
parent a11f87dc3c
commit e7c21bc92c
16 changed files with 202 additions and 245 deletions

View File

@@ -38,7 +38,7 @@
"fastify": "^5.2.1",
"fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0",
"groupmq": "1.0.0-next.17",
"groupmq": "1.0.0-next.18",
"ico-to-png": "^0.2.2",
"jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1",

View File

@@ -3,8 +3,7 @@ import type { FastifyReply, FastifyRequest } from 'fastify';
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getSalts } from '@openpanel/db';
import { eventsGroupQueue, eventsQueue } from '@openpanel/queue';
import { getLock, getRedisCache } from '@openpanel/redis';
import { eventsGroupQueue } from '@openpanel/queue';
import type { PostEventPayload } from '@openpanel/sdk';
import { checkDuplicatedEvent } from '@/utils/deduplicate';
@@ -61,57 +60,28 @@ export async function postEvent(
return;
}
const isGroupQueue = await getRedisCache().exists('group_queue');
if (isGroupQueue) {
const uaInfo = parseUserAgent(ua, request.body?.properties);
const groupId = uaInfo.isServer
? request.body?.profileId
? `${projectId}:${request.body?.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
const uaInfo = parseUserAgent(ua, request.body?.properties);
const groupId = uaInfo.isServer
? request.body?.profileId
? `${projectId}:${request.body?.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
groupId,
});
} else {
await eventsQueue.add(
'event',
{
type: 'incomingEvent',
payload: {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
},
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 200,
},
},
);
}
geo,
currentDeviceId,
previousDeviceId,
},
groupId,
});
reply.status(202).send('ok');
}

View File

@@ -3,13 +3,11 @@ import type { FastifyReply, FastifyRequest } from 'fastify';
import { path, assocPath, pathOr, pick } from 'ramda';
import { checkDuplicatedEvent } from '@/utils/deduplicate';
import { logger } from '@/utils/logger';
import { generateId } from '@openpanel/common';
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { eventsGroupQueue, eventsQueue } from '@openpanel/queue';
import { getLock, getRedisCache } from '@openpanel/redis';
import { eventsGroupQueue } from '@openpanel/queue';
import type {
DecrementPayload,
IdentifyPayload,
@@ -282,57 +280,28 @@ async function track({
timestamp: string;
isTimestampFromThePast: boolean;
}) {
const isGroupQueue = await getRedisCache().exists('group_queue');
if (isGroupQueue) {
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
const groupId = uaInfo.isServer
? payload.profileId
? `${projectId}:${payload.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
const groupId = uaInfo.isServer
? payload.profileId
? `${projectId}:${payload.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
groupId,
});
} else {
await eventsQueue.add(
'event',
{
type: 'incomingEvent',
payload: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
},
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 200,
},
},
);
}
geo,
currentDeviceId,
previousDeviceId,
},
groupId,
});
}
async function identify({

View File

@@ -1,7 +1,7 @@
import { ch, db } from '@openpanel/db';
import {
cronQueue,
eventsQueue,
eventsGroupQueue,
miscQueue,
notificationQueue,
sessionsQueue,
@@ -71,7 +71,7 @@ export async function shutdown(
// Step 6: Close Bull queues (graceful shutdown of queue state)
try {
await Promise.all([
eventsQueue.close(),
eventsGroupQueue.close(),
sessionsQueue.close(),
cronQueue.close(),
miscQueue.close(),

View File

@@ -23,7 +23,7 @@
"@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7",
"express": "^4.18.2",
"groupmq": "1.0.0-next.17",
"groupmq": "1.0.0-next.18",
"prom-client": "^15.1.3",
"ramda": "^0.29.1",
"source-map-support": "^0.5.21",

View File

@@ -5,7 +5,6 @@ import {
type EventsQueuePayloadIncomingEvent,
cronQueue,
eventsGroupQueue,
eventsQueue,
miscQueue,
notificationQueue,
queueLogger,
@@ -45,7 +44,6 @@ export async function bootWorkers() {
},
});
eventsGroupWorker.run();
const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions);
const sessionsWorker = new Worker(
sessionsQueue.name,
sessionsJob,
@@ -61,7 +59,6 @@ export async function bootWorkers() {
const workers = [
sessionsWorker,
eventsWorker,
cronWorker,
notificationWorker,
miscWorker,

View File

@@ -7,7 +7,6 @@ import { createInitialSalts } from '@openpanel/db';
import {
cronQueue,
eventsGroupQueue,
eventsQueue,
miscQueue,
notificationQueue,
sessionsQueue,
@@ -36,7 +35,6 @@ async function start() {
createBullBoard({
queues: [
new BullBoardGroupMQAdapter(eventsGroupQueue) as any,
new BullMQAdapter(eventsQueue),
new BullMQAdapter(sessionsQueue),
new BullMQAdapter(cronQueue),
new BullMQAdapter(notificationQueue),

View File

@@ -3,6 +3,8 @@ import type { Job } from 'bullmq';
import { logger as baseLogger } from '@/utils/logger';
import { getTime } from '@openpanel/common';
import {
type IClickhouseSession,
type IServiceCreateEventPayload,
type IServiceEvent,
TABLE_NAMES,
checkNotificationRulesForSessionEnd,
@@ -10,37 +12,33 @@ import {
eventBuffer,
formatClickhouseDate,
getEvents,
getHasFunnelRules,
getNotificationRulesByProjectId,
sessionBuffer,
} from '@openpanel/db';
import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue';
const MAX_SESSION_EVENTS = 500;
// Grabs session_start and screen_views + the last occured event
async function getNecessarySessionEvents({
async function getSessionEvents({
projectId,
sessionId,
createdAt,
startAt,
endAt,
}: {
projectId: string;
sessionId: string;
createdAt: Date;
startAt: Date;
endAt: Date;
}): Promise<ReturnType<typeof getEvents>> {
const sql = `
SELECT * FROM ${TABLE_NAMES.events}
WHERE
session_id = '${sessionId}'
AND project_id = '${projectId}'
AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}'
AND (
name IN ('screen_view', 'session_start')
OR created_at = (
SELECT MAX(created_at)
FROM ${TABLE_NAMES.events}
WHERE session_id = '${sessionId}'
AND project_id = '${projectId}'
AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}'
AND name NOT IN ('screen_view', 'session_start')
)
)
ORDER BY created_at DESC;
AND created_at BETWEEN '${formatClickhouseDate(startAt)}' AND '${formatClickhouseDate(endAt)}'
ORDER BY created_at DESC LIMIT ${MAX_SESSION_EVENTS};
`;
const [lastScreenView, eventsInDb] = await Promise.all([
@@ -63,62 +61,77 @@ async function getNecessarySessionEvents({
export async function createSessionEnd(
job: Job<EventsQueuePayloadCreateSessionEnd>,
) {
const { payload } = job.data;
const logger = baseLogger.child({
payload: job.data.payload,
payload,
jobId: job.id,
reqId: job.data.payload.properties?.__reqId ?? 'unknown',
reqId: payload.properties?.__reqId ?? 'unknown',
});
logger.info('Processing session end job');
const payload = job.data.payload;
const session = await sessionBuffer.getExistingSession(payload.sessionId);
const events = await getNecessarySessionEvents({
if (!session) {
throw new Error('Session not found');
}
try {
handleSessionEndNotifications({
session,
payload,
});
} catch (error) {
logger.error('Creating notificatios for session end failed', {
error,
});
}
const lastScreenView = await eventBuffer.getLastScreenView({
projectId: payload.projectId,
sessionId: payload.sessionId,
createdAt: payload.createdAt,
});
const sessionStart = events.find((event) => event.name === 'session_start');
const screenViews = events.filter((event) => event.name === 'screen_view');
const lastEvent = events[0];
if (!sessionStart) {
throw new Error('No session_start found');
}
if (!lastEvent) {
throw new Error('No last event found');
}
const sessionDuration =
lastEvent.createdAt.getTime() - sessionStart.createdAt.getTime();
await checkNotificationRulesForSessionEnd(events).catch(() => {
logger.error('Error checking notification rules for session end', {
data: job.data,
});
});
logger.info('Creating session_end event', {
sessionStart,
lastEvent,
screenViews,
sessionDuration,
events,
});
// Create session end event
return createEvent({
...sessionStart,
...payload,
properties: {
...sessionStart.properties,
...(screenViews[0]?.properties ?? {}),
__bounce: screenViews.length <= 1,
...payload.properties,
...(lastScreenView?.properties ?? {}),
__bounce: session.is_bounce,
},
name: 'session_end',
duration: sessionDuration,
path: screenViews[0]?.path ?? '',
createdAt: new Date(getTime(lastEvent.createdAt) + 1000),
profileId: lastEvent.profileId || sessionStart.profileId,
duration: session.duration ?? 0,
path: lastScreenView?.path ?? '',
createdAt: new Date(getTime(session.ended_at) + 1000),
profileId: lastScreenView?.profileId || payload.profileId,
});
}
async function handleSessionEndNotifications({
session,
payload,
}: {
session: IClickhouseSession;
payload: IServiceCreateEventPayload;
}) {
const notificationRules = await getNotificationRulesByProjectId(
payload.projectId,
);
const hasFunnelRules = getHasFunnelRules(notificationRules);
const isEventCountReasonable =
session.event_count + session.screen_view_count < MAX_SESSION_EVENTS;
if (hasFunnelRules && isEventCountReasonable) {
const events = await getSessionEvents({
projectId: payload.projectId,
sessionId: payload.sessionId,
startAt: new Date(session.created_at),
endAt: new Date(session.ended_at),
});
if (events.length > 0) {
await checkNotificationRulesForSessionEnd(events);
}
}
}

View File

@@ -7,18 +7,13 @@ import {
profileBuffer,
sessionBuffer,
} from '@openpanel/db';
import {
cronQueue,
eventsGroupQueue,
eventsQueue,
sessionsQueue,
} from '@openpanel/queue';
import { cronQueue, eventsGroupQueue, sessionsQueue } from '@openpanel/queue';
const Registry = client.Registry;
export const register = new Registry();
const queues = [eventsQueue, sessionsQueue, cronQueue, eventsGroupQueue];
const queues = [sessionsQueue, cronQueue, eventsGroupQueue];
queues.forEach((queue) => {
register.registerMetric(

View File

@@ -65,12 +65,6 @@ export async function getSessionEnd({
});
if (sessionEnd) {
// Hack: if session end job just got created, we want to give it a chance to complete
// So the order is correct
if (sessionEnd.job.timestamp > Date.now() - 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
const existingSessionIsAnonymous =
sessionEnd.job.data.payload.profileId ===
sessionEnd.job.data.payload.deviceId;

View File

@@ -25,7 +25,7 @@ export class SessionBuffer extends BaseBuffer {
this.redis = getRedisCache();
}
async getExistingSession(sessionId: string) {
public async getExistingSession(sessionId: string) {
const hit = await this.redis.get(`session:${sessionId}`);
if (hit) {

View File

@@ -69,7 +69,8 @@ export type INotificationRuleCached = Awaited<
ReturnType<typeof getNotificationRulesByProjectId>
>[number];
export const getNotificationRulesByProjectId = cacheable(
function getNotificationRulesByProjectId(projectId: string) {
'getNotificationRulesByProjectId',
(projectId: string) => {
return db.notificationRule.findMany({
where: {
projectId,
@@ -330,6 +331,17 @@ export async function checkNotificationRulesForEvent(
);
}
const isFunnelRule = (rule: INotificationRuleCached) =>
rule.config.type === 'funnel';
export function getHasFunnelRules(rules: INotificationRuleCached[]) {
return rules.some(isFunnelRule);
}
export function getFunnelRules(rules: INotificationRuleCached[]) {
return rules.filter(isFunnelRule);
}
export async function checkNotificationRulesForSessionEnd(
events: IServiceEvent[],
) {
@@ -344,8 +356,7 @@ export async function checkNotificationRulesForSessionEnd(
getNotificationRulesByProjectId(projectId),
]);
const funnelRules = rules.filter((rule) => rule.config.type === 'funnel');
const funnelRules = getFunnelRules(rules);
const notificationPromises = funnelRules.flatMap((rule) => {
// Match funnel events
let funnelIndex = 0;

View File

@@ -10,7 +10,7 @@
"@openpanel/logger": "workspace:*",
"@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7",
"groupmq": "1.0.0-next.17"
"groupmq": "1.0.0-next.18"
},
"devDependencies": {
"@openpanel/sdk": "workspace:*",

View File

@@ -1,6 +1,10 @@
import { Queue, QueueEvents } from 'bullmq';
import type { IServiceEvent, Prisma } from '@openpanel/db';
import type {
IServiceCreateEventPayload,
IServiceEvent,
Prisma,
} from '@openpanel/db';
import { createLogger } from '@openpanel/logger';
import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis';
import type { TrackPayload } from '@openpanel/sdk';
@@ -32,16 +36,10 @@ export interface EventsQueuePayloadCreateEvent {
type: 'createEvent';
payload: Omit<IServiceEvent, 'id'>;
}
type SessionEndRequired =
| 'sessionId'
| 'deviceId'
| 'profileId'
| 'projectId'
| 'createdAt';
export interface EventsQueuePayloadCreateSessionEnd {
type: 'createSessionEnd';
payload: Partial<Omit<IServiceEvent, SessionEndRequired>> &
Pick<IServiceEvent, SessionEndRequired>;
payload: IServiceCreateEventPayload;
}
// TODO: Rename `EventsQueuePayloadCreateSessionEnd`
@@ -95,18 +93,6 @@ export type MiscQueuePayload = MiscQueuePayloadTrialEndingSoon;
export type CronQueueType = CronQueuePayload['type'];
export const eventsQueue = new Queue<EventsQueuePayload>('events', {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
const orderingWindowMs = Number.parseInt(
process.env.ORDERING_WINDOW_MS || '50',
10,

View File

@@ -23,33 +23,57 @@ export async function getCache<T>(
return data;
}
export function cacheable<T extends (...args: any) => any>(
fn: T,
expireInSec: number,
) {
const cachePrefix = `cachable:${fn.name}`;
function stringify(obj: unknown): string {
if (obj === null) return 'null';
if (obj === undefined) return 'undefined';
if (typeof obj === 'boolean') return obj ? 'true' : 'false';
if (typeof obj === 'number') return String(obj);
if (typeof obj === 'string') return obj;
if (typeof obj === 'function') return obj.toString();
function stringify(obj: unknown): string {
if (obj === null) return 'null';
if (obj === undefined) return 'undefined';
if (typeof obj === 'boolean') return obj ? 'true' : 'false';
if (typeof obj === 'number') return String(obj);
if (typeof obj === 'string') return obj;
if (typeof obj === 'function') return obj.toString();
if (Array.isArray(obj)) {
return `[${obj.map(stringify).join(',')}]`;
}
if (typeof obj === 'object') {
const pairs = Object.entries(obj)
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, value]) => `${key}:${stringify(value)}`);
return pairs.join(':');
}
// Fallback for any other types
return String(obj);
if (Array.isArray(obj)) {
return `[${obj.map(stringify).join(',')}]`;
}
if (typeof obj === 'object') {
const pairs = Object.entries(obj)
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, value]) => `${key}:${stringify(value)}`);
return pairs.join(':');
}
// Fallback for any other types
return String(obj);
}
export function cacheable<T extends (...args: any) => any>(
fnOrName: T | string,
fnOrExpireInSec: number | T,
_expireInSec?: number,
) {
const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name;
const fn =
typeof fnOrName === 'function'
? fnOrName
: typeof fnOrExpireInSec === 'function'
? fnOrExpireInSec
: null;
const expireInSec =
typeof fnOrExpireInSec === 'number'
? fnOrExpireInSec
: typeof _expireInSec === 'number'
? _expireInSec
: null;
if (typeof fn !== 'function') {
throw new Error('fn is not a function');
}
if (typeof expireInSec !== 'number') {
throw new Error('expireInSec is not a number');
}
const cachePrefix = `cachable:${name}`;
const getKey = (...args: Parameters<T>) =>
`${cachePrefix}:${stringify(args)}`;
const cachedFn = async (

18
pnpm-lock.yaml generated
View File

@@ -127,8 +127,8 @@ importers:
specifier: ^5.0.0
version: 5.0.0
groupmq:
specifier: 1.0.0-next.17
version: 1.0.0-next.17(ioredis@5.4.1)
specifier: 1.0.0-next.18
version: 1.0.0-next.18(ioredis@5.4.1)
ico-to-png:
specifier: ^0.2.2
version: 0.2.2
@@ -760,8 +760,8 @@ importers:
specifier: ^4.18.2
version: 4.18.2
groupmq:
specifier: 1.0.0-next.17
version: 1.0.0-next.17(ioredis@5.4.1)
specifier: 1.0.0-next.18
version: 1.0.0-next.18(ioredis@5.4.1)
prom-client:
specifier: ^15.1.3
version: 15.1.3
@@ -1224,8 +1224,8 @@ importers:
specifier: ^5.8.7
version: 5.8.7
groupmq:
specifier: 1.0.0-next.17
version: 1.0.0-next.17(ioredis@5.4.1)
specifier: 1.0.0-next.18
version: 1.0.0-next.18(ioredis@5.4.1)
devDependencies:
'@openpanel/sdk':
specifier: workspace:*
@@ -8837,8 +8837,8 @@ packages:
resolution: {integrity: sha512-5v6yZd4JK3eMI3FqqCouswVqwugaA9r4dNZB1wwcmrD02QkV5H0y7XBQW8QwQqEaZY1pM9aqORSORhJRdNK44Q==}
engines: {node: '>=6.0'}
groupmq@1.0.0-next.17:
resolution: {integrity: sha512-yPqlijEf/zYLX3aj036dAwloRu345dvMWQxUtint0zUojU98yFI0s6ZxQWVqYluLp1nPiYTGdhTkuCPhfPiPiQ==}
groupmq@1.0.0-next.18:
resolution: {integrity: sha512-JNZb5ocJQ4NsIYpCCxPFqEH9widt7oeJSFDvOdtpunEa8qyOgFgZUtkMBVfB2Hxtkow/GxBRrDbNc4KB096jVQ==}
engines: {node: '>=18'}
peerDependencies:
ioredis: '>=5'
@@ -22060,7 +22060,7 @@ snapshots:
section-matter: 1.0.0
strip-bom-string: 1.0.0
groupmq@1.0.0-next.17(ioredis@5.4.1):
groupmq@1.0.0-next.18(ioredis@5.4.1):
dependencies:
cron-parser: 4.9.0
ioredis: 5.4.1