165 lines
3.9 KiB
TypeScript
165 lines
3.9 KiB
TypeScript
import type { Queue, WorkerOptions } from 'bullmq';
|
|
import { Worker } from 'bullmq';
|
|
|
|
import {
|
|
cronQueue,
|
|
eventsQueue,
|
|
notificationQueue,
|
|
sessionsQueue,
|
|
} from '@openpanel/queue';
|
|
import { getRedisQueue } from '@openpanel/redis';
|
|
|
|
import { performance } from 'node:perf_hooks';
|
|
import { setTimeout as sleep } from 'node:timers/promises';
|
|
import { cronJob } from './jobs/cron';
|
|
import { eventsJob } from './jobs/events';
|
|
import { notificationJob } from './jobs/notification';
|
|
import { sessionsJob } from './jobs/sessions';
|
|
import { logger } from './utils/logger';
|
|
|
|
const workerOptions: WorkerOptions = {
|
|
connection: getRedisQueue(),
|
|
concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10),
|
|
};
|
|
|
|
export async function bootWorkers() {
|
|
const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions);
|
|
const sessionsWorker = new Worker(
|
|
sessionsQueue.name,
|
|
sessionsJob,
|
|
workerOptions,
|
|
);
|
|
const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions);
|
|
const notificationWorker = new Worker(
|
|
notificationQueue.name,
|
|
notificationJob,
|
|
workerOptions,
|
|
);
|
|
|
|
const workers = [
|
|
sessionsWorker,
|
|
eventsWorker,
|
|
cronWorker,
|
|
notificationWorker,
|
|
];
|
|
|
|
workers.forEach((worker) => {
|
|
worker.on('error', (error) => {
|
|
logger.error('worker error', {
|
|
worker: worker.name,
|
|
error,
|
|
});
|
|
});
|
|
|
|
worker.on('closed', () => {
|
|
logger.info('worker closed', {
|
|
worker: worker.name,
|
|
});
|
|
});
|
|
|
|
worker.on('ready', () => {
|
|
logger.info('worker ready', {
|
|
worker: worker.name,
|
|
});
|
|
});
|
|
|
|
worker.on('failed', (job) => {
|
|
if (job) {
|
|
logger.error('job failed', {
|
|
worker: worker.name,
|
|
data: job.data,
|
|
error: job.failedReason,
|
|
options: job.opts,
|
|
});
|
|
}
|
|
});
|
|
|
|
worker.on('completed', (job) => {
|
|
if (job) {
|
|
logger.info('job completed', {
|
|
worker: worker.name,
|
|
data: job.data,
|
|
elapsed:
|
|
job.processedOn && job.finishedOn
|
|
? job.finishedOn - job.processedOn
|
|
: undefined,
|
|
});
|
|
}
|
|
});
|
|
|
|
worker.on('ioredis:close', () => {
|
|
logger.error('worker closed due to ioredis:close', {
|
|
worker: worker.name,
|
|
});
|
|
});
|
|
});
|
|
|
|
async function exitHandler(
|
|
eventName: string,
|
|
evtOrExitCodeOrError: number | string | Error,
|
|
) {
|
|
logger.info('Starting graceful shutdown', {
|
|
code: evtOrExitCodeOrError,
|
|
eventName,
|
|
});
|
|
try {
|
|
const time = performance.now();
|
|
await waitForQueueToEmpty(cronQueue);
|
|
await Promise.all([
|
|
cronWorker.close(),
|
|
eventsWorker.close(),
|
|
sessionsWorker.close(),
|
|
notificationWorker.close(),
|
|
]);
|
|
logger.info('workers closed successfully', {
|
|
elapsed: performance.now() - time,
|
|
});
|
|
} catch (e) {
|
|
logger.error('exit handler error', {
|
|
code: evtOrExitCodeOrError,
|
|
error: e,
|
|
});
|
|
}
|
|
const exitCode = Number.isNaN(+evtOrExitCodeOrError)
|
|
? 1
|
|
: +evtOrExitCodeOrError;
|
|
process.exit(exitCode);
|
|
}
|
|
|
|
['uncaughtException', 'unhandledRejection', 'SIGTERM', 'SIGINT'].forEach(
|
|
(evt) => {
|
|
process.on(evt, (code) => {
|
|
exitHandler(evt, code);
|
|
});
|
|
},
|
|
);
|
|
|
|
return workers;
|
|
}
|
|
|
|
export async function waitForQueueToEmpty(queue: Queue, timeout = 60_000) {
|
|
const startTime = performance.now();
|
|
|
|
while (true) {
|
|
const activeCount = await queue.getActiveCount();
|
|
|
|
if (activeCount === 0) {
|
|
break;
|
|
}
|
|
|
|
if (performance.now() - startTime > timeout) {
|
|
logger.warn('Timeout reached while waiting for queue to empty', {
|
|
queue: queue.name,
|
|
remainingCount: activeCount,
|
|
});
|
|
break;
|
|
}
|
|
|
|
logger.info('Waiting for queue to finish', {
|
|
queue: queue.name,
|
|
count: activeCount,
|
|
});
|
|
await sleep(500);
|
|
}
|
|
}
|