fix: performance related fixes

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-10-22 12:29:56 +02:00
parent 8bb0c87ec9
commit 1285ad85a2
60 changed files with 4264 additions and 2959 deletions

View File

@@ -2,75 +2,221 @@ import type { Queue, WorkerOptions } from 'bullmq';
import { Worker } from 'bullmq';
import {
EVENTS_GROUP_QUEUES_SHARDS,
type EventsQueuePayloadIncomingEvent,
cronQueue,
eventsGroupQueue,
eventsGroupQueues,
importQueue,
miscQueue,
notificationQueue,
queueLogger,
sessionsQueue,
} from '@openpanel/queue';
import { getRedisQueue } from '@openpanel/redis';
import { getLock, getRedisQueue } from '@openpanel/redis';
import { performance } from 'node:perf_hooks';
import { setTimeout as sleep } from 'node:timers/promises';
import { Worker as GroupWorker } from 'groupmq';
import { cronJob } from './jobs/cron';
import { eventsJob } from './jobs/events';
import { incomingEventPure } from './jobs/events.incoming-event';
import { importJob } from './jobs/import';
import { miscJob } from './jobs/misc';
import { notificationJob } from './jobs/notification';
import { sessionsJob } from './jobs/sessions';
import { eventsGroupJobDuration } from './metrics';
import { logger } from './utils/logger';
import { requireSingleton } from './utils/singleton-lock';
const workerOptions: WorkerOptions = {
connection: getRedisQueue(),
};
export async function bootWorkers() {
const eventsGroupWorker = new GroupWorker<
EventsQueuePayloadIncomingEvent['payload']
>({
concurrency: Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '1', 10),
logger: queueLogger,
queue: eventsGroupQueue,
handler: async (job) => {
logger.info('processing event (group queue)', {
groupId: job.groupId,
timestamp: job.data.event.timestamp,
});
await incomingEventPure(job.data);
},
});
eventsGroupWorker.run();
const sessionsWorker = new Worker(
sessionsQueue.name,
sessionsJob,
workerOptions,
);
const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions);
const notificationWorker = new Worker(
notificationQueue.name,
notificationJob,
workerOptions,
);
const miscWorker = new Worker(miscQueue.name, miscJob, workerOptions);
const importWorker = new Worker(importQueue.name, importJob, {
...workerOptions,
concurrency: Number.parseInt(process.env.IMPORT_JOB_CONCURRENCY || '1', 10),
});
type QueueName = string; // Can be: events, events_N (where N is 0 to shards-1), sessions, cron, notification, misc
const workers = [
sessionsWorker,
cronWorker,
notificationWorker,
miscWorker,
importWorker,
// eventsGroupWorker,
];
/**
* Parses the ENABLED_QUEUES environment variable and returns an array of queue names to start.
* If no env var is provided, returns all queues.
*
* Supported queue names:
* - events - All event shards (events_0, events_1, ..., events_N)
* - events_N - Individual event shard (where N is 0 to EVENTS_GROUP_QUEUES_SHARDS-1)
* - sessions, cron, notification, misc
*/
function getEnabledQueues(): QueueName[] {
const enabledQueuesEnv = process.env.ENABLED_QUEUES?.trim();
if (!enabledQueuesEnv) {
logger.info('No ENABLED_QUEUES specified, starting all queues', {
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
});
return ['events', 'sessions', 'cron', 'notification', 'misc', 'import'];
}
const queues = enabledQueuesEnv
.split(',')
.map((q) => q.trim())
.filter(Boolean);
logger.info('Starting queues from ENABLED_QUEUES', {
queues,
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
});
return queues;
}
/**
* Gets the concurrency setting for a queue from environment variables.
* Env var format: {QUEUE_NAME}_CONCURRENCY (e.g., EVENTS_0_CONCURRENCY=32)
*/
function getConcurrencyFor(queueName: string, defaultValue = 1): number {
const envKey = `${queueName.toUpperCase().replace(/[^A-Z0-9]/g, '_')}_CONCURRENCY`;
const value = process.env[envKey];
if (value) {
const parsed = Number.parseInt(value, 10);
if (!Number.isNaN(parsed) && parsed > 0) {
return parsed;
}
}
return defaultValue;
}
export async function bootWorkers() {
const enabledQueues = getEnabledQueues();
const enforceSingleton = process.env.ENFORCE_SINGLETON === '1';
let singletonCleanup: (() => void) | null = null;
// Enforce singleton lock if requested
if (enforceSingleton) {
const lockKey = enabledQueues.join(',');
logger.info('Enforcing singleton mode', { lockKey });
singletonCleanup = await requireSingleton(lockKey);
}
const workers: (Worker | GroupWorker<any>)[] = [];
// Start event workers based on enabled queues
const eventQueuesToStart: number[] = [];
if (enabledQueues.includes('events')) {
// Start all event shards
for (let i = 0; i < EVENTS_GROUP_QUEUES_SHARDS; i++) {
eventQueuesToStart.push(i);
}
} else {
// Start specific event shards (events_0, events_1, etc.)
for (let i = 0; i < EVENTS_GROUP_QUEUES_SHARDS; i++) {
if (enabledQueues.includes(`events_${i}`)) {
eventQueuesToStart.push(i);
}
}
}
for (const index of eventQueuesToStart) {
const queue = eventsGroupQueues[index];
if (!queue) continue;
const queueName = `events_${index}`;
const concurrency = getConcurrencyFor(
queueName,
Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10),
);
const worker = new GroupWorker<EventsQueuePayloadIncomingEvent['payload']>({
queue,
concurrency,
logger: queueLogger,
blockingTimeoutSec: Number.parseFloat(
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
),
handler: async (job) => {
if (await getLock(job.id, '1', 10000)) {
logger.info('worker handler', {
jobId: job.id,
groupId: job.groupId,
timestamp: job.data.event.timestamp,
data: job.data,
});
} else {
logger.info('event already processed', {
jobId: job.id,
groupId: job.groupId,
timestamp: job.data.event.timestamp,
data: job.data,
});
}
await incomingEventPure(job.data);
},
});
worker.run();
workers.push(worker);
logger.info(`Started worker for ${queueName}`, { concurrency });
}
// Start sessions worker
if (enabledQueues.includes('sessions')) {
const concurrency = getConcurrencyFor('sessions');
const sessionsWorker = new Worker(sessionsQueue.name, sessionsJob, {
...workerOptions,
concurrency,
});
workers.push(sessionsWorker);
logger.info('Started worker for sessions', { concurrency });
}
// Start cron worker
if (enabledQueues.includes('cron')) {
const concurrency = getConcurrencyFor('cron');
const cronWorker = new Worker(cronQueue.name, cronJob, {
...workerOptions,
concurrency,
});
workers.push(cronWorker);
logger.info('Started worker for cron', { concurrency });
}
// Start notification worker
if (enabledQueues.includes('notification')) {
const concurrency = getConcurrencyFor('notification');
const notificationWorker = new Worker(
notificationQueue.name,
notificationJob,
{ ...workerOptions, concurrency },
);
workers.push(notificationWorker);
logger.info('Started worker for notification', { concurrency });
}
// Start misc worker
if (enabledQueues.includes('misc')) {
const concurrency = getConcurrencyFor('misc');
const miscWorker = new Worker(miscQueue.name, miscJob, {
...workerOptions,
concurrency,
});
workers.push(miscWorker);
logger.info('Started worker for misc', { concurrency });
}
// Start import worker
if (enabledQueues.includes('import')) {
const concurrency = getConcurrencyFor('import');
const importWorker = new Worker(importQueue.name, importJob, {
...workerOptions,
concurrency,
});
workers.push(importWorker);
logger.info('Started worker for misc', { concurrency });
}
if (workers.length === 0) {
logger.warn(
'No workers started. Check ENABLED_QUEUES environment variable.',
);
}
workers.forEach((worker) => {
(worker as Worker).on('error', (error) => {
@@ -94,6 +240,13 @@ export async function bootWorkers() {
(worker as Worker).on('failed', (job) => {
if (job) {
if (job.processedOn && job.finishedOn) {
const duration = job.finishedOn - job.processedOn;
eventsGroupJobDuration.observe(
{ queue_shard: worker.name, status: 'failed' },
duration,
);
}
logger.error('job failed', {
jobId: job.id,
worker: worker.name,
@@ -106,15 +259,13 @@ export async function bootWorkers() {
(worker as Worker).on('completed', (job) => {
if (job) {
logger.info('job completed', {
jobId: job.id,
worker: worker.name,
data: job.data,
elapsed:
job.processedOn && job.finishedOn
? job.finishedOn - job.processedOn
: undefined,
});
if (job.processedOn && job.finishedOn) {
const duration = job.finishedOn - job.processedOn;
eventsGroupJobDuration.observe(
{ queue_shard: worker.name, status: 'success' },
duration,
);
}
}
});
@@ -135,8 +286,19 @@ export async function bootWorkers() {
});
try {
const time = performance.now();
await waitForQueueToEmpty(cronQueue);
// Wait for cron queue to empty if it's running
if (enabledQueues.includes('cron')) {
await waitForQueueToEmpty(cronQueue);
}
await Promise.all(workers.map((worker) => worker.close()));
// Release singleton lock if acquired
if (singletonCleanup) {
singletonCleanup();
}
logger.info('workers closed successfully', {
elapsed: performance.now() - time,
});
@@ -155,15 +317,7 @@ export async function bootWorkers() {
['uncaughtException', 'unhandledRejection', 'SIGTERM', 'SIGINT'].forEach(
(evt) => {
process.on(evt, (code) => {
if (process.env.NODE_ENV === 'production') {
exitHandler(evt, code);
} else {
logger.info('Shutting down for development', {
event: evt,
code,
});
process.exit(0);
}
exitHandler(evt, code);
});
},
);

View File

@@ -34,7 +34,9 @@ async function start() {
serverAdapter.setBasePath('/');
createBullBoard({
queues: [
new BullBoardGroupMQAdapter(eventsGroupQueue) as any,
...eventsGroupQueues.map(
(queue) => new BullBoardGroupMQAdapter(queue) as any,
),
new BullMQAdapter(sessionsQueue),
new BullMQAdapter(cronQueue),
new BullMQAdapter(notificationQueue),

View File

@@ -68,7 +68,7 @@ export async function createSessionEnd(
reqId: payload.properties?.__reqId ?? 'unknown',
});
logger.info('Processing session end job');
logger.debug('Processing session end job');
const session = await sessionBuffer.getExistingSession(payload.sessionId);

View File

@@ -18,9 +18,9 @@ import {
} from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import type { Job } from 'bullmq';
import { getLock, getRedisCache } from '@openpanel/redis';
import { DelayedError, type Job } from 'bullmq';
import * as R from 'ramda';
import { omit } from 'ramda';
import { v4 as uuid } from 'uuid';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
@@ -56,6 +56,7 @@ export async function incomingEventPure(
job?: Job<EventsQueuePayloadIncomingEvent>,
token?: string,
) {
await getRedisCache().incr('queue:counter');
const {
geo,
event: body,
@@ -63,6 +64,7 @@ export async function incomingEventPure(
projectId,
currentDeviceId,
previousDeviceId,
uaInfo: _uaInfo,
} = jobPayload;
const properties = body.properties ?? {};
const reqId = headers['request-id'] ?? 'unknown';
@@ -93,13 +95,14 @@ export async function incomingEventPure(
const userAgent = headers['user-agent'];
const sdkName = headers['openpanel-sdk-name'];
const sdkVersion = headers['openpanel-sdk-version'];
const uaInfo = parseUserAgent(userAgent, properties);
// TODO: Remove both user-agent and parseUserAgent
const uaInfo = _uaInfo ?? parseUserAgent(userAgent, properties);
const baseEvent = {
name: body.name,
profileId,
projectId,
properties: omit(GLOBAL_PROPERTIES, {
properties: R.omit(GLOBAL_PROPERTIES, {
...properties,
__user_agent: userAgent,
__hash: hash,

View File

@@ -2,23 +2,33 @@ import client from 'prom-client';
import {
botBuffer,
db,
eventBuffer,
profileBuffer,
sessionBuffer,
} from '@openpanel/db';
import { cronQueue, eventsGroupQueue, sessionsQueue } from '@openpanel/queue';
import { cronQueue, eventsGroupQueues, sessionsQueue } from '@openpanel/queue';
const Registry = client.Registry;
export const register = new Registry();
const queues = [sessionsQueue, cronQueue, eventsGroupQueue];
const queues = [sessionsQueue, cronQueue, ...eventsGroupQueues];
// Histogram to track job processing time for eventsGroupQueues
export const eventsGroupJobDuration = new client.Histogram({
name: 'events_group_job_duration_ms',
help: 'Duration of job processing in eventsGroupQueues (in ms)',
labelNames: ['queue_shard', 'status'],
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
registers: [register],
});
register.registerMetric(eventsGroupJobDuration);
queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_active_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_active_count`,
help: 'Active count',
async collect() {
const metric = await queue.getActiveCount();
@@ -29,7 +39,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_delayed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_delayed_count`,
help: 'Delayed count',
async collect() {
const metric = await queue.getDelayedCount();
@@ -40,7 +50,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_failed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_failed_count`,
help: 'Failed count',
async collect() {
const metric = await queue.getFailedCount();
@@ -51,7 +61,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_completed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_completed_count`,
help: 'Completed count',
async collect() {
const metric = await queue.getCompletedCount();
@@ -62,7 +72,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_waiting_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_waiting_count`,
help: 'Waiting count',
async collect() {
const metric = await queue.getWaitingCount();

View File

@@ -113,7 +113,7 @@ export async function getSessionEndJob(args: {
} | null> {
const state = await job.getState();
if (state !== 'delayed') {
logger.info(`[session-handler] Session end job is in "${state}" state`, {
logger.debug(`[session-handler] Session end job is in "${state}" state`, {
state,
retryCount,
jobTimestamp: new Date(job.timestamp).toISOString(),

View File

@@ -0,0 +1,69 @@
import { getLock, getRedisCache } from '@openpanel/redis';
import { logger } from './logger';
/**
* Acquires a distributed lock to ensure only one instance of a worker group runs.
* If the lock cannot be acquired, the process exits.
*
* @param key - The lock key (e.g., 'utility-queues')
* @param ttlMs - Time to live for the lock in milliseconds (default: 60000)
* @returns A cleanup function that releases the lock
*/
export async function requireSingleton(
key: string,
ttlMs = 60_000,
): Promise<() => void> {
const lockKey = `lock:singleton:${key}`;
const lockValue = `${process.pid}-${Date.now()}`;
// Try to acquire the lock
const acquired = await getLock(lockKey, lockValue, ttlMs);
if (!acquired) {
logger.error(
`Another instance holds singleton lock for "${key}". Exiting.`,
{ key },
);
process.exit(0);
}
logger.debug('Acquired singleton lock', { key, ttlMs, lockValue });
// Set up automatic extension to keep the lock alive
const extensionInterval = setInterval(async () => {
try {
// Extend the lock by setting it again with the same value
const redis = getRedisCache();
const result = await redis.set(lockKey, lockValue, 'PX', ttlMs, 'XX');
if (result === 'OK') {
logger.debug('Extended singleton lock', { key });
} else {
// Lock was lost (someone else acquired it or it expired)
logger.error('Lost singleton lock - exiting', { key });
clearInterval(extensionInterval);
process.exit(1);
}
} catch (error: unknown) {
logger.error('Failed to extend singleton lock - exiting', {
key,
error,
});
clearInterval(extensionInterval);
process.exit(1);
}
}, ttlMs / 2);
// Return cleanup function
return () => {
clearInterval(extensionInterval);
getRedisCache()
.del(lockKey)
.then(() => {
logger.debug('Released singleton lock', { key });
})
.catch((error: unknown) => {
logger.error('Failed to release singleton lock', { key, error });
});
};
}