wip: try groupmq 2

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-13 06:43:09 +01:00
parent a672b73947
commit 9c3c1458bb
27 changed files with 343 additions and 531 deletions

View File

@@ -30,7 +30,6 @@
"@openpanel/logger": "workspace:*",
"@openpanel/payments": "workspace:*",
"@openpanel/queue": "workspace:*",
"groupmq": "catalog:",
"@openpanel/redis": "workspace:*",
"@openpanel/trpc": "workspace:*",
"@openpanel/validation": "workspace:*",
@@ -40,6 +39,7 @@
"fastify": "^5.6.1",
"fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0",
"groupmq": "catalog:",
"jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1",
"sharp": "^0.33.5",

View File

@@ -1,4 +1,4 @@
import { cacheable, cacheableLru } from '@openpanel/redis';
import { cacheable } from '@openpanel/redis';
import bots from './bots';
// Pre-compile regex patterns at module load time
@@ -15,7 +15,7 @@ const compiledBots = bots.map((bot) => {
const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot);
const includesBots = compiledBots.filter((bot) => 'includes' in bot);
export const isBot = cacheableLru(
export const isBot = cacheable(
'is-bot',
(ua: string) => {
// Check simple string patterns first (fast)
@@ -40,8 +40,5 @@ export const isBot = cacheableLru(
return null;
},
{
maxSize: 1000,
ttl: 60 * 5,
},
60 * 5
);

View File

@@ -19,9 +19,14 @@ export function wsVisitors(
) {
const { params } = req;
const sendCount = () => {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
socket.send(String(count));
});
eventBuffer
.getActiveVisitorCount(params.projectId)
.then((count) => {
socket.send(String(count));
})
.catch(() => {
socket.send('0');
});
};
const unsubscribe = subscribeToPublishedEvent(

View File

@@ -1,5 +1,4 @@
import crypto from 'node:crypto';
import { HttpError } from '@/utils/errors';
import { stripTrailingSlash } from '@openpanel/common';
import { hashPassword } from '@openpanel/common/server';
import {
@@ -10,6 +9,7 @@ import {
} from '@openpanel/db';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { z } from 'zod';
import { HttpError } from '@/utils/errors';
// Validation schemas
const zCreateProject = z.object({
@@ -57,7 +57,7 @@ const zUpdateReference = z.object({
// Projects CRUD
export async function listProjects(
request: FastifyRequest,
reply: FastifyReply,
reply: FastifyReply
) {
const projects = await db.project.findMany({
where: {
@@ -74,7 +74,7 @@ export async function listProjects(
export async function getProject(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const project = await db.project.findFirst({
where: {
@@ -92,7 +92,7 @@ export async function getProject(
export async function createProject(
request: FastifyRequest<{ Body: z.infer<typeof zCreateProject> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateProject.safeParse(request.body);
@@ -139,12 +139,9 @@ export async function createProject(
},
});
// Clear cache
await Promise.all([
getProjectByIdCached.clear(project.id),
project.clients.map((client) => {
getClientByIdCached.clear(client.id);
}),
...project.clients.map((client) => getClientByIdCached.clear(client.id)),
]);
reply.send({
@@ -165,7 +162,7 @@ export async function updateProject(
Params: { id: string };
Body: z.infer<typeof zUpdateProject>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateProject.safeParse(request.body);
@@ -223,12 +220,9 @@ export async function updateProject(
data: updateData,
});
// Clear cache
await Promise.all([
getProjectByIdCached.clear(project.id),
existing.clients.map((client) => {
getClientByIdCached.clear(client.id);
}),
...existing.clients.map((client) => getClientByIdCached.clear(client.id)),
]);
reply.send({ data: project });
@@ -236,7 +230,7 @@ export async function updateProject(
export async function deleteProject(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const project = await db.project.findFirst({
where: {
@@ -266,7 +260,7 @@ export async function deleteProject(
// Clients CRUD
export async function listClients(
request: FastifyRequest<{ Querystring: { projectId?: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const where: any = {
organizationId: request.client!.organizationId,
@@ -300,7 +294,7 @@ export async function listClients(
export async function getClient(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const client = await db.client.findFirst({
where: {
@@ -318,7 +312,7 @@ export async function getClient(
export async function createClient(
request: FastifyRequest<{ Body: z.infer<typeof zCreateClient> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateClient.safeParse(request.body);
@@ -374,7 +368,7 @@ export async function updateClient(
Params: { id: string };
Body: z.infer<typeof zUpdateClient>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateClient.safeParse(request.body);
@@ -417,7 +411,7 @@ export async function updateClient(
export async function deleteClient(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const client = await db.client.findFirst({
where: {
@@ -444,7 +438,7 @@ export async function deleteClient(
// References CRUD
export async function listReferences(
request: FastifyRequest<{ Querystring: { projectId?: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const where: any = {};
@@ -488,7 +482,7 @@ export async function listReferences(
export async function getReference(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const reference = await db.reference.findUnique({
where: {
@@ -516,7 +510,7 @@ export async function getReference(
export async function createReference(
request: FastifyRequest<{ Body: z.infer<typeof zCreateReference> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateReference.safeParse(request.body);
@@ -559,7 +553,7 @@ export async function updateReference(
Params: { id: string };
Body: z.infer<typeof zUpdateReference>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateReference.safeParse(request.body);
@@ -616,7 +610,7 @@ export async function updateReference(
export async function deleteReference(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const reference = await db.reference.findUnique({
where: {

View File

@@ -7,7 +7,10 @@ import {
upsertProfile,
} from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import {
type EventsQueuePayloadIncomingEvent,
getEventsGroupQueueShard,
} from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import {
type IDecrementPayload,
@@ -112,6 +115,7 @@ interface TrackContext {
identity?: IIdentifyPayload;
deviceId: string;
sessionId: string;
session?: EventsQueuePayloadIncomingEvent['payload']['session'];
geo: GeoLocation;
}
@@ -141,19 +145,21 @@ async function buildContext(
validatedBody.payload.profileId = profileId;
}
const overrideDeviceId =
validatedBody.type === 'track' &&
typeof validatedBody.payload?.properties?.__deviceId === 'string'
? validatedBody.payload?.properties.__deviceId
: undefined;
// Get geo location (needed for track and identify)
const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]);
const { deviceId, sessionId } = await getDeviceId({
const deviceIdResult = await getDeviceId({
projectId,
ip,
ua,
salts,
overrideDeviceId:
validatedBody.type === 'track' &&
typeof validatedBody.payload?.properties?.__deviceId === 'string'
? validatedBody.payload?.properties.__deviceId
: undefined,
overrideDeviceId,
});
return {
@@ -166,8 +172,9 @@ async function buildContext(
isFromPast: timestamp.isTimestampFromThePast,
},
identity,
deviceId,
sessionId,
deviceId: deviceIdResult.deviceId,
sessionId: deviceIdResult.sessionId,
session: deviceIdResult.session,
geo,
};
}
@@ -176,13 +183,14 @@ async function handleTrack(
payload: ITrackPayload,
context: TrackContext
): Promise<void> {
const { projectId, deviceId, geo, headers, timestamp, sessionId } = context;
const { projectId, deviceId, geo, headers, timestamp, sessionId, session } =
context;
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
const groupId = uaInfo.isServer
? payload.profileId
? `${projectId}:${payload.profileId}`
: `${projectId}:${generateId()}`
: undefined
: deviceId;
const jobId = [
slug(payload.name),
@@ -203,7 +211,7 @@ async function handleTrack(
}
promises.push(
getEventsGroupQueueShard(groupId).add({
getEventsGroupQueueShard(groupId || generateId()).add({
orderMs: timestamp.value,
data: {
projectId,
@@ -217,6 +225,7 @@ async function handleTrack(
geo,
deviceId,
sessionId,
session,
},
groupId,
jobId,

View File

@@ -1,20 +1,19 @@
import { isBot } from '@/bots';
import { createBotEvent } from '@openpanel/db';
import type {
DeprecatedPostEventPayload,
ITrackHandlerPayload,
} from '@openpanel/validation';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { isBot } from '@/bots';
export async function isBotHook(
req: FastifyRequest<{
Body: ITrackHandlerPayload | DeprecatedPostEventPayload;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const bot = req.headers['user-agent']
? isBot(req.headers['user-agent'])
? await isBot(req.headers['user-agent'])
: null;
if (bot && req.client?.projectId) {
@@ -44,6 +43,6 @@ export async function isBotHook(
}
}
return reply.status(202).send();
return reply.status(202).send({ bot });
}
}

View File

@@ -1,6 +1,5 @@
import { fetchDeviceId, handler } from '@/controllers/track.controller';
import type { FastifyPluginCallback } from 'fastify';
import { fetchDeviceId, handler } from '@/controllers/track.controller';
import { clientHook } from '@/hooks/client.hook';
import { duplicateHook } from '@/hooks/duplicate.hook';
import { isBotHook } from '@/hooks/is-bot.hook';
@@ -13,7 +12,7 @@ const trackRouter: FastifyPluginCallback = async (fastify) => {
fastify.route({
method: 'POST',
url: '/',
handler: handler,
handler,
});
fastify.route({

View File

@@ -1,7 +1,12 @@
import crypto from 'node:crypto';
import { generateDeviceId } from '@openpanel/common/server';
import { getSafeJson } from '@openpanel/json';
import type {
EventsQueuePayloadCreateSessionEnd,
EventsQueuePayloadIncomingEvent,
} from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import { pick } from 'ramda';
export async function getDeviceId({
projectId,
@@ -37,14 +42,20 @@ export async function getDeviceId({
ua,
});
return await getDeviceIdFromSession({
return await getInfoFromSession({
projectId,
currentDeviceId,
previousDeviceId,
});
}
async function getDeviceIdFromSession({
interface DeviceIdResult {
deviceId: string;
sessionId: string;
session?: EventsQueuePayloadIncomingEvent['payload']['session'];
}
async function getInfoFromSession({
projectId,
currentDeviceId,
previousDeviceId,
@@ -52,7 +63,7 @@ async function getDeviceIdFromSession({
projectId: string;
currentDeviceId: string;
previousDeviceId: string;
}) {
}): Promise<DeviceIdResult> {
try {
const multi = getRedisCache().multi();
multi.hget(
@@ -65,21 +76,33 @@ async function getDeviceIdFromSession({
);
const res = await multi.exec();
if (res?.[0]?.[1]) {
const data = getSafeJson<{ payload: { sessionId: string } }>(
const data = getSafeJson<EventsQueuePayloadCreateSessionEnd>(
(res?.[0]?.[1] as string) ?? ''
);
if (data) {
const sessionId = data.payload.sessionId;
return { deviceId: currentDeviceId, sessionId };
return {
deviceId: currentDeviceId,
sessionId: data.payload.sessionId,
session: pick(
['referrer', 'referrerName', 'referrerType'],
data.payload
),
};
}
}
if (res?.[1]?.[1]) {
const data = getSafeJson<{ payload: { sessionId: string } }>(
const data = getSafeJson<EventsQueuePayloadCreateSessionEnd>(
(res?.[1]?.[1] as string) ?? ''
);
if (data) {
const sessionId = data.payload.sessionId;
return { deviceId: previousDeviceId, sessionId };
return {
deviceId: previousDeviceId,
sessionId: data.payload.sessionId,
session: pick(
['referrer', 'referrerName', 'referrerType'],
data.payload
),
};
}
}
} catch (error) {

View File

@@ -16,11 +16,11 @@
"@openpanel/common": "workspace:*",
"@openpanel/db": "workspace:*",
"@openpanel/email": "workspace:*",
"@openpanel/importer": "workspace:*",
"@openpanel/integrations": "workspace:^",
"@openpanel/js-runtime": "workspace:*",
"@openpanel/json": "workspace:*",
"@openpanel/logger": "workspace:*",
"@openpanel/importer": "workspace:*",
"@openpanel/payments": "workspace:*",
"@openpanel/queue": "workspace:*",
"@openpanel/redis": "workspace:*",

View File

@@ -1,10 +1,9 @@
import type { Queue, WorkerOptions } from 'bullmq';
import { Worker } from 'bullmq';
import { performance } from 'node:perf_hooks';
import { setTimeout as sleep } from 'node:timers/promises';
import {
cronQueue,
EVENTS_GROUP_QUEUES_SHARDS,
type EventsQueuePayloadIncomingEvent,
cronQueue,
eventsGroupQueues,
gscQueue,
importQueue,
@@ -15,14 +14,12 @@ import {
sessionsQueue,
} from '@openpanel/queue';
import { getRedisQueue } from '@openpanel/redis';
import { performance } from 'node:perf_hooks';
import { setTimeout as sleep } from 'node:timers/promises';
import type { Queue, WorkerOptions } from 'bullmq';
import { Worker } from 'bullmq';
import { Worker as GroupWorker } from 'groupmq';
import { cronJob } from './jobs/cron';
import { gscJob } from './jobs/gsc';
import { incomingEvent } from './jobs/events.incoming-event';
import { gscJob } from './jobs/gsc';
import { importJob } from './jobs/import';
import { insightsProjectJob } from './jobs/insights';
import { miscJob } from './jobs/misc';
@@ -95,7 +92,7 @@ function getConcurrencyFor(queueName: string, defaultValue = 1): number {
return defaultValue;
}
export async function bootWorkers() {
export function bootWorkers() {
const enabledQueues = getEnabledQueues();
const workers: (Worker | GroupWorker<any>)[] = [];
@@ -119,12 +116,14 @@ export async function bootWorkers() {
for (const index of eventQueuesToStart) {
const queue = eventsGroupQueues[index];
if (!queue) continue;
if (!queue) {
continue;
}
const queueName = `events_${index}`;
const concurrency = getConcurrencyFor(
queueName,
Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10),
Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10)
);
const worker = new GroupWorker<EventsQueuePayloadIncomingEvent['payload']>({
@@ -132,7 +131,7 @@ export async function bootWorkers() {
concurrency,
logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined,
blockingTimeoutSec: Number.parseFloat(
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1'
),
handler: async (job) => {
return await incomingEvent(job.data);
@@ -172,7 +171,7 @@ export async function bootWorkers() {
const notificationWorker = new Worker(
notificationQueue.name,
notificationJob,
{ ...workerOptions, concurrency },
{ ...workerOptions, concurrency }
);
workers.push(notificationWorker);
logger.info('Started worker for notification', { concurrency });
@@ -224,7 +223,7 @@ export async function bootWorkers() {
if (workers.length === 0) {
logger.warn(
'No workers started. Check ENABLED_QUEUES environment variable.',
'No workers started. Check ENABLED_QUEUES environment variable.'
);
}
@@ -254,7 +253,7 @@ export async function bootWorkers() {
const elapsed = job.finishedOn - job.processedOn;
eventsGroupJobDuration.observe(
{ name: worker.name, status: 'failed' },
elapsed,
elapsed
);
}
logger.error('job failed', {
@@ -267,23 +266,6 @@ export async function bootWorkers() {
}
});
(worker as Worker).on('completed', (job) => {
if (job) {
if (job.processedOn && job.finishedOn) {
const elapsed = job.finishedOn - job.processedOn;
logger.info('job completed', {
jobId: job.id,
worker: worker.name,
elapsed,
});
eventsGroupJobDuration.observe(
{ name: worker.name, status: 'success' },
elapsed,
);
}
}
});
(worker as Worker).on('ioredis:close', () => {
logger.error('worker closed due to ioredis:close', {
worker: worker.name,
@@ -293,7 +275,7 @@ export async function bootWorkers() {
async function exitHandler(
eventName: string,
evtOrExitCodeOrError: number | string | Error,
evtOrExitCodeOrError: number | string | Error
) {
// Log the actual error details for unhandled rejections/exceptions
if (evtOrExitCodeOrError instanceof Error) {
@@ -339,7 +321,7 @@ export async function bootWorkers() {
process.on(evt, (code) => {
exitHandler(evt, code);
});
},
}
);
return workers;

View File

@@ -33,7 +33,7 @@ async function generateNewSalt() {
return created;
});
getSalts.clear();
await getSalts.clear();
return newSalt;
}

View File

@@ -1,9 +1,5 @@
import { getTime, isSameDomain, parsePath } from '@openpanel/common';
import {
getReferrerWithQuery,
parseReferrer,
parseUserAgent,
} from '@openpanel/common/server';
import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server';
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
import {
checkNotificationRulesForEvent,
@@ -14,10 +10,12 @@ import {
} from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import { getLock } from '@openpanel/redis';
import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda';
import { logger as baseLogger } from '@/utils/logger';
import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler';
import {
createSessionEndJob,
extendSessionEndJob,
} from '@/utils/session-handler';
const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue'];
@@ -93,7 +91,8 @@ export async function incomingEvent(
projectId,
deviceId,
sessionId,
uaInfo: _uaInfo,
uaInfo,
session,
} = jobPayload;
const properties = body.properties ?? {};
const reqId = headers['request-id'] ?? 'unknown';
@@ -121,16 +120,15 @@ export async function incomingEvent(
? null
: parseReferrer(getProperty('__referrer'));
const utmReferrer = getReferrerWithQuery(query);
const userAgent = headers['user-agent'];
const sdkName = headers['openpanel-sdk-name'];
const sdkVersion = headers['openpanel-sdk-version'];
// TODO: Remove both user-agent and parseUserAgent
const uaInfo = _uaInfo ?? parseUserAgent(userAgent, properties);
const baseEvent = {
const baseEvent: IServiceCreateEventPayload = {
name: body.name,
profileId,
projectId,
deviceId,
sessionId,
properties: omit(GLOBAL_PROPERTIES, {
...properties,
__hash: hash,
@@ -149,7 +147,7 @@ export async function incomingEvent(
origin,
referrer: referrer?.url || '',
referrerName: utmReferrer?.name || referrer?.name || referrer?.url,
referrerType: referrer?.type || utmReferrer?.type || '',
referrerType: utmReferrer?.type || referrer?.type || '',
os: uaInfo.os,
osVersion: uaInfo.osVersion,
browser: uaInfo.browser,
@@ -161,16 +159,17 @@ export async function incomingEvent(
body.name === 'revenue' && '__revenue' in properties
? parseRevenue(properties.__revenue)
: undefined,
} as const;
};
// if timestamp is from the past we dont want to create a new session
if (uaInfo.isServer || isTimestampFromThePast) {
const session = profileId
? await sessionBuffer.getExistingSession({
profileId,
projectId,
})
: null;
const session =
profileId && !isTimestampFromThePast
? await sessionBuffer.getExistingSession({
profileId,
projectId,
})
: null;
const payload = {
...baseEvent,
@@ -198,82 +197,48 @@ export async function incomingEvent(
return createEventAndNotify(payload as IServiceEvent, logger, projectId);
}
const sessionEnd = await getSessionEnd({
projectId,
deviceId,
profileId,
});
const activeSession = sessionEnd
? await sessionBuffer.getExistingSession({
sessionId: sessionEnd.sessionId,
})
: null;
const payload: IServiceCreateEventPayload = merge(baseEvent, {
deviceId: sessionEnd?.deviceId ?? deviceId,
sessionId: sessionEnd?.sessionId ?? sessionId,
referrer: sessionEnd?.referrer ?? baseEvent.referrer,
referrerName: sessionEnd?.referrerName ?? baseEvent.referrerName,
referrerType: sessionEnd?.referrerType ?? baseEvent.referrerType,
// if the path is not set, use the last screen view path
path: baseEvent.path || activeSession?.exit_path || '',
origin: baseEvent.origin || activeSession?.exit_origin || '',
referrer: session?.referrer ?? baseEvent.referrer,
referrerName: session?.referrerName ?? baseEvent.referrerName,
referrerType: session?.referrerType ?? baseEvent.referrerType,
} as Partial<IServiceCreateEventPayload>) as IServiceCreateEventPayload;
// If the triggering event is filtered, do not create session_start or the event (issue #2)
const isExcluded = await isEventExcludedByProjectFilter(payload, projectId);
if (isExcluded) {
logger.info(
'Skipping session_start and event (excluded by project filter)',
{ event: payload.name, projectId }
);
return null;
}
if (session) {
await extendSessionEndJob({
projectId,
deviceId,
}).catch((error) => {
logger.error('Error finding and extending session end job', { error });
throw error;
});
} else {
await createEventAndNotify(
{
event: payload.name,
projectId,
}
);
return null;
}
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
},
logger,
projectId
).catch((error) => {
logger.error('Error creating session start event', { event: payload });
throw error;
});
if (!sessionEnd) {
const locked = await getLock(
`session_start:${projectId}:${sessionId}`,
'1',
1000
);
if (locked) {
logger.info('Creating session start event', { event: payload });
await createEventAndNotify(
{
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
},
logger,
projectId
).catch((error) => {
logger.error('Error creating session start event', { event: payload });
throw error;
});
} else {
logger.info('Session start already claimed by another worker', {
event: payload,
});
}
}
const event = await createEventAndNotify(payload, logger, projectId);
if (!event) {
// Skip creating session end when event was excluded
return null;
}
if (!sessionEnd) {
logger.info('Creating session end job', { event: payload });
await createSessionEndJob({ payload }).catch((error) => {
logger.error('Error creating session end job', { event: payload });
throw error;
});
}
return event;
return createEventAndNotify(payload, logger, projectId);
}

View File

@@ -186,6 +186,11 @@ describe('incomingEvent', () => {
projectId,
deviceId,
sessionId: 'session-123',
session: {
referrer: '',
referrerName: '',
referrerType: '',
},
};
const changeDelay = vi.fn();

View File

@@ -1,5 +1,3 @@
import client from 'prom-client';
import {
botBuffer,
eventBuffer,
@@ -8,6 +6,7 @@ import {
sessionBuffer,
} from '@openpanel/db';
import { cronQueue, eventsGroupQueues, sessionsQueue } from '@openpanel/queue';
import client from 'prom-client';
const Registry = client.Registry;
@@ -20,7 +19,7 @@ export const eventsGroupJobDuration = new client.Histogram({
name: 'job_duration_ms',
help: 'Duration of job processing (in ms)',
labelNames: ['name', 'status'],
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10_000, 30_000], // 10ms to 30s
});
register.registerMetric(eventsGroupJobDuration);
@@ -28,57 +27,61 @@ register.registerMetric(eventsGroupJobDuration);
queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name.replace(/[\{\}]/g, '')}_active_count`,
name: `${queue.name.replace(/[{}]/g, '')}_active_count`,
help: 'Active count',
async collect() {
const metric = await queue.getActiveCount();
this.set(metric);
},
}),
})
);
register.registerMetric(
new client.Gauge({
name: `${queue.name.replace(/[\{\}]/g, '')}_delayed_count`,
name: `${queue.name.replace(/[{}]/g, '')}_delayed_count`,
help: 'Delayed count',
async collect() {
const metric = await queue.getDelayedCount();
this.set(metric);
if ('getDelayedCount' in queue) {
const metric = await queue.getDelayedCount();
this.set(metric);
} else {
this.set(0);
}
},
}),
})
);
register.registerMetric(
new client.Gauge({
name: `${queue.name.replace(/[\{\}]/g, '')}_failed_count`,
name: `${queue.name.replace(/[{}]/g, '')}_failed_count`,
help: 'Failed count',
async collect() {
const metric = await queue.getFailedCount();
this.set(metric);
},
}),
})
);
register.registerMetric(
new client.Gauge({
name: `${queue.name.replace(/[\{\}]/g, '')}_completed_count`,
name: `${queue.name.replace(/[{}]/g, '')}_completed_count`,
help: 'Completed count',
async collect() {
const metric = await queue.getCompletedCount();
this.set(metric);
},
}),
})
);
register.registerMetric(
new client.Gauge({
name: `${queue.name.replace(/[\{\}]/g, '')}_waiting_count`,
name: `${queue.name.replace(/[{}]/g, '')}_waiting_count`,
help: 'Waiting count',
async collect() {
const metric = await queue.getWaitingCount();
this.set(metric);
},
}),
})
);
});
@@ -90,7 +93,7 @@ register.registerMetric(
const metric = await eventBuffer.getBufferSize();
this.set(metric);
},
}),
})
);
register.registerMetric(
@@ -101,7 +104,7 @@ register.registerMetric(
const metric = await profileBuffer.getBufferSize();
this.set(metric);
},
}),
})
);
register.registerMetric(
@@ -112,7 +115,7 @@ register.registerMetric(
const metric = await botBuffer.getBufferSize();
this.set(metric);
},
}),
})
);
register.registerMetric(
@@ -123,7 +126,7 @@ register.registerMetric(
const metric = await sessionBuffer.getBufferSize();
this.set(metric);
},
}),
})
);
register.registerMetric(
@@ -134,5 +137,5 @@ register.registerMetric(
const metric = await replayBuffer.getBufferSize();
this.set(metric);
},
}),
})
);

View File

@@ -1,13 +1,39 @@
import type { IServiceCreateEventPayload } from '@openpanel/db';
import {
type EventsQueuePayloadCreateSessionEnd,
sessionsQueue,
} from '@openpanel/queue';
import type { Job } from 'bullmq';
import { logger } from './logger';
import { sessionsQueue } from '@openpanel/queue';
export const SESSION_TIMEOUT = 1000 * 60 * 30;
const CHANGE_DELAY_THROTTLE_MS = process.env.CHANGE_DELAY_THROTTLE_MS
? Number.parseInt(process.env.CHANGE_DELAY_THROTTLE_MS, 10)
: 60_000; // 1 minute
const CHANGE_DELAY_THROTTLE_MAP = new Map<string, number>();
export async function extendSessionEndJob({
projectId,
deviceId,
}: {
projectId: string;
deviceId: string;
}) {
const last = CHANGE_DELAY_THROTTLE_MAP.get(`${projectId}:${deviceId}`) ?? 0;
const isThrottled = Date.now() - last < CHANGE_DELAY_THROTTLE_MS;
if (isThrottled) {
return;
}
const jobId = getSessionEndJobId(projectId, deviceId);
const job = await sessionsQueue.getJob(jobId);
if (!job) {
return;
}
await job.changeDelay(SESSION_TIMEOUT);
CHANGE_DELAY_THROTTLE_MAP.set(`${projectId}:${deviceId}`, Date.now());
}
const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`;
@@ -33,106 +59,3 @@ export function createSessionEndJob({
}
);
}
export async function getSessionEnd({
projectId,
deviceId,
profileId,
}: {
projectId: string;
deviceId: string;
profileId: string;
}) {
const sessionEnd = await getSessionEndJob({
projectId,
deviceId,
});
if (sessionEnd) {
const existingSessionIsAnonymous =
sessionEnd.job.data.payload.profileId ===
sessionEnd.job.data.payload.deviceId;
const eventIsIdentified =
profileId && sessionEnd.job.data.payload.profileId !== profileId;
if (existingSessionIsAnonymous && eventIsIdentified) {
await sessionEnd.job.updateData({
...sessionEnd.job.data,
payload: {
...sessionEnd.job.data.payload,
profileId,
},
});
}
await sessionEnd.job.changeDelay(SESSION_TIMEOUT);
return sessionEnd.job.data.payload;
}
return null;
}
export async function getSessionEndJob(args: {
projectId: string;
deviceId: string;
retryCount?: number;
}): Promise<{
deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const { retryCount = 0 } = args;
if (retryCount >= 6) {
throw new Error('Failed to get session end');
}
async function handleJobStates(
job: Job<EventsQueuePayloadCreateSessionEnd>,
deviceId: string
): Promise<{
deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>;
} | null> {
const state = await job.getState();
if (state !== 'delayed') {
logger.debug(`[session-handler] Session end job is in "${state}" state`, {
state,
retryCount,
jobTimestamp: new Date(job.timestamp).toISOString(),
jobDelta: Date.now() - job.timestamp,
jobId: job.id,
payload: job.data.payload,
});
}
if (state === 'delayed' || state === 'waiting') {
return { deviceId, job };
}
if (state === 'active') {
await new Promise((resolve) => setTimeout(resolve, 100));
return getSessionEndJob({
...args,
retryCount: retryCount + 1,
});
}
if (state === 'completed') {
await job.remove();
}
return null;
}
// Check current device job
const currentJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.deviceId)
);
if (currentJob) {
return await handleJobStates(currentJob, args.deviceId);
}
// Create session
return null;
}