From 9ffa213fc26a59e0ad4354c9beac1ca86b67c485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 27 Nov 2024 12:54:36 +0100 Subject: [PATCH] improve(worker): add env variable to disable workers --- apps/worker/src/boot-cron.ts | 76 +++++++++++++ apps/worker/src/boot-workers.ts | 130 +++++++++++++++++++++ apps/worker/src/index.ts | 192 ++------------------------------ 3 files changed, 213 insertions(+), 185 deletions(-) create mode 100644 apps/worker/src/boot-cron.ts create mode 100644 apps/worker/src/boot-workers.ts diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts new file mode 100644 index 00000000..770747c2 --- /dev/null +++ b/apps/worker/src/boot-cron.ts @@ -0,0 +1,76 @@ +import type { CronQueueType } from '@openpanel/queue'; +import { cronQueue } from '@openpanel/queue'; + +import { logger } from './utils/logger'; + +export async function bootCron() { + const jobs: { + name: string; + type: CronQueueType; + pattern: string | number; + }[] = [ + { + name: 'salt', + type: 'salt', + pattern: '0 0 * * *', + }, + { + name: 'flush', + type: 'flushEvents', + pattern: 1000 * 10, + }, + { + name: 'flush', + type: 'flushProfiles', + pattern: 1000 * 60, + }, + ]; + + if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') { + jobs.push({ + name: 'ping', + type: 'ping', + pattern: '0 0 * * *', + }); + } + + // Add repeatable jobs + for (const job of jobs) { + await cronQueue.add( + job.name, + { + type: job.type, + payload: undefined, + }, + { + jobId: job.type, + repeat: + typeof job.pattern === 'number' + ? { + every: job.pattern, + } + : { + pattern: job.pattern, + }, + }, + ); + } + + // Remove outdated repeatable jobs + const repeatableJobs = await cronQueue.getRepeatableJobs(); + for (const repeatableJob of repeatableJobs) { + const match = jobs.find( + (job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key, + ); + if (match) { + logger.info('Repeatable job exists', { + key: repeatableJob.key, + }); + } else { + logger.info('Removing repeatable job', { + key: repeatableJob.key, + }); + cronQueue.removeRepeatableByKey(repeatableJob.key); + } + } +} diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts new file mode 100644 index 00000000..4289b443 --- /dev/null +++ b/apps/worker/src/boot-workers.ts @@ -0,0 +1,130 @@ +import type { WorkerOptions } from 'bullmq'; +import { Worker } from 'bullmq'; + +import { + cronQueue, + eventsQueue, + notificationQueue, + sessionsQueue, +} from '@openpanel/queue'; +import { getRedisQueue } from '@openpanel/redis'; + +import { cronJob } from './jobs/cron'; +import { eventsJob } from './jobs/events'; +import { notificationJob } from './jobs/notification'; +import { sessionsJob } from './jobs/sessions'; +import { logger } from './utils/logger'; + +const workerOptions: WorkerOptions = { + connection: getRedisQueue(), + concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10), +}; + +export async function bootWorkers() { + const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions); + const sessionsWorker = new Worker( + sessionsQueue.name, + sessionsJob, + workerOptions, + ); + const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions); + const notificationWorker = new Worker( + notificationQueue.name, + notificationJob, + workerOptions, + ); + + const workers = [ + sessionsWorker, + eventsWorker, + cronWorker, + notificationWorker, + ]; + + workers.forEach((worker) => { + worker.on('error', (error) => { + logger.error('worker error', { + worker: worker.name, + error, + }); + }); + + worker.on('closed', () => { + logger.info('worker closed', { + worker: worker.name, + }); + }); + + worker.on('ready', () => { + logger.info('worker ready', { + worker: worker.name, + }); + }); + + worker.on('failed', (job) => { + if (job) { + logger.error('job failed', { + worker: worker.name, + data: job.data, + error: job.failedReason, + options: job.opts, + }); + } + }); + + worker.on('completed', (job) => { + if (job) { + logger.info('job completed', { + worker: worker.name, + data: job.data, + elapsed: + job.processedOn && job.finishedOn + ? job.finishedOn - job.processedOn + : undefined, + }); + } + }); + + worker.on('ioredis:close', () => { + logger.error('worker closed due to ioredis:close', { + worker: worker.name, + }); + }); + }); + + async function exitHandler( + eventName: string, + evtOrExitCodeOrError: number | string | Error, + ) { + logger.info('Starting graceful shutdown', { + code: evtOrExitCodeOrError, + eventName, + }); + try { + await Promise.all([ + cronWorker.close(), + eventsWorker.close(), + sessionsWorker.close(), + notificationWorker.close(), + ]); + logger.info('workers closed successfully'); + } catch (e) { + logger.error('exit handler error', { + code: evtOrExitCodeOrError, + error: e, + }); + } + const exitCode = Number.isNaN(+evtOrExitCodeOrError) + ? 1 + : +evtOrExitCodeOrError; + process.exit(exitCode); + } + + ['uncaughtException', 'unhandledRejection', 'SIGTERM'].forEach((evt) => + process.on(evt, (code) => { + exitHandler(evt, code); + }), + ); + + return workers; +} diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 28b85dbf..8d104bb4 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -1,24 +1,18 @@ import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; -import type { WorkerOptions } from 'bullmq'; -import { Worker } from 'bullmq'; import express from 'express'; import { createInitialSalts } from '@openpanel/db'; -import type { CronQueueType } from '@openpanel/queue'; import { cronQueue, eventsQueue, notificationQueue, sessionsQueue, } from '@openpanel/queue'; -import { getRedisQueue } from '@openpanel/redis'; -import { cronJob } from './jobs/cron'; -import { eventsJob } from './jobs/events'; -import { notificationJob } from './jobs/notification'; -import { sessionsJob } from './jobs/sessions'; +import { bootCron } from './boot-cron'; +import { bootWorkers } from './boot-workers'; import { register } from './metrics'; import { logger } from './utils/logger'; @@ -27,25 +21,7 @@ const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/'); const app = express(); -const workerOptions: WorkerOptions = { - connection: getRedisQueue(), - concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10), -}; - async function start() { - const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions); - const sessionsWorker = new Worker( - sessionsQueue.name, - sessionsJob, - workerOptions, - ); - const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions); - const notificationWorker = new Worker( - notificationQueue.name, - notificationJob, - workerOptions, - ); - createBullBoard({ queues: [ new BullMQAdapter(eventsQueue), @@ -74,165 +50,11 @@ async function start() { console.log(`For the UI, open http://localhost:${PORT}/`); }); - const workers = [ - sessionsWorker, - eventsWorker, - cronWorker, - notificationWorker, - ]; - workers.forEach((worker) => { - worker.on('error', (error) => { - logger.error('worker error', { - worker: worker.name, - error, - }); - }); - - worker.on('closed', () => { - logger.info('worker closed', { - worker: worker.name, - }); - }); - - worker.on('ready', () => { - logger.info('worker ready', { - worker: worker.name, - }); - }); - - worker.on('failed', (job) => { - if (job) { - logger.error('job failed', { - worker: worker.name, - data: job.data, - error: job.failedReason, - options: job.opts, - }); - } - }); - - worker.on('completed', (job) => { - if (job) { - logger.info('job completed', { - worker: worker.name, - data: job.data, - elapsed: - job.processedOn && job.finishedOn - ? job.finishedOn - job.processedOn - : undefined, - }); - } - }); - - worker.on('ioredis:close', () => { - logger.error('worker closed due to ioredis:close', { - worker: worker.name, - }); - }); - }); - - async function exitHandler( - eventName: string, - evtOrExitCodeOrError: number | string | Error, - ) { - logger.info('Starting graceful shutdown', { - code: evtOrExitCodeOrError, - eventName, - }); - try { - await Promise.all([ - cronWorker.close(), - eventsWorker.close(), - sessionsWorker.close(), - notificationWorker.close(), - ]); - logger.info('workers closed successfully'); - } catch (e) { - logger.error('exit handler error', { - code: evtOrExitCodeOrError, - error: e, - }); - } - const exitCode = Number.isNaN(+evtOrExitCodeOrError) - ? 1 - : +evtOrExitCodeOrError; - process.exit(exitCode); - } - - ['uncaughtException', 'unhandledRejection', 'SIGTERM'].forEach((evt) => - process.on(evt, (code) => { - exitHandler(evt, code); - }), - ); - - const jobs: { - name: string; - type: CronQueueType; - pattern: string | number; - }[] = [ - { - name: 'salt', - type: 'salt', - pattern: '0 0 * * *', - }, - { - name: 'flush', - type: 'flushEvents', - pattern: 1000 * 10, - }, - { - name: 'flush', - type: 'flushProfiles', - pattern: 1000 * 60, - }, - ]; - - if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') { - jobs.push({ - name: 'ping', - type: 'ping', - pattern: '0 0 * * *', - }); - } - - // Add repeatable jobs - for (const job of jobs) { - await cronQueue.add( - job.name, - { - type: job.type, - payload: undefined, - }, - { - jobId: job.type, - repeat: - typeof job.pattern === 'number' - ? { - every: job.pattern, - } - : { - pattern: job.pattern, - }, - }, - ); - } - - // Remove outdated repeatable jobs - const repeatableJobs = await cronQueue.getRepeatableJobs(); - for (const repeatableJob of repeatableJobs) { - const match = jobs.find( - (job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key, - ); - if (match) { - logger.info('Repeatable job exists', { - key: repeatableJob.key, - }); - } else { - logger.info('Removing repeatable job', { - key: repeatableJob.key, - }); - cronQueue.removeRepeatableByKey(repeatableJob.key); - } + if (process.env.DISABLE_WORKERS === undefined) { + await bootWorkers(); + await bootCron(); + } else { + logger.warn('Workers are disabled'); } await createInitialSalts();