improve(worker): add env variable to disable workers
This commit is contained in:
76
apps/worker/src/boot-cron.ts
Normal file
76
apps/worker/src/boot-cron.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import type { CronQueueType } from '@openpanel/queue';
|
||||
import { cronQueue } from '@openpanel/queue';
|
||||
|
||||
import { logger } from './utils/logger';
|
||||
|
||||
export async function bootCron() {
|
||||
const jobs: {
|
||||
name: string;
|
||||
type: CronQueueType;
|
||||
pattern: string | number;
|
||||
}[] = [
|
||||
{
|
||||
name: 'salt',
|
||||
type: 'salt',
|
||||
pattern: '0 0 * * *',
|
||||
},
|
||||
{
|
||||
name: 'flush',
|
||||
type: 'flushEvents',
|
||||
pattern: 1000 * 10,
|
||||
},
|
||||
{
|
||||
name: 'flush',
|
||||
type: 'flushProfiles',
|
||||
pattern: 1000 * 60,
|
||||
},
|
||||
];
|
||||
|
||||
if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') {
|
||||
jobs.push({
|
||||
name: 'ping',
|
||||
type: 'ping',
|
||||
pattern: '0 0 * * *',
|
||||
});
|
||||
}
|
||||
|
||||
// Add repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await cronQueue.add(
|
||||
job.name,
|
||||
{
|
||||
type: job.type,
|
||||
payload: undefined,
|
||||
},
|
||||
{
|
||||
jobId: job.type,
|
||||
repeat:
|
||||
typeof job.pattern === 'number'
|
||||
? {
|
||||
every: job.pattern,
|
||||
}
|
||||
: {
|
||||
pattern: job.pattern,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Remove outdated repeatable jobs
|
||||
const repeatableJobs = await cronQueue.getRepeatableJobs();
|
||||
for (const repeatableJob of repeatableJobs) {
|
||||
const match = jobs.find(
|
||||
(job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key,
|
||||
);
|
||||
if (match) {
|
||||
logger.info('Repeatable job exists', {
|
||||
key: repeatableJob.key,
|
||||
});
|
||||
} else {
|
||||
logger.info('Removing repeatable job', {
|
||||
key: repeatableJob.key,
|
||||
});
|
||||
cronQueue.removeRepeatableByKey(repeatableJob.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
130
apps/worker/src/boot-workers.ts
Normal file
130
apps/worker/src/boot-workers.ts
Normal file
@@ -0,0 +1,130 @@
|
||||
import type { WorkerOptions } from 'bullmq';
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import {
|
||||
cronQueue,
|
||||
eventsQueue,
|
||||
notificationQueue,
|
||||
sessionsQueue,
|
||||
} from '@openpanel/queue';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
|
||||
import { cronJob } from './jobs/cron';
|
||||
import { eventsJob } from './jobs/events';
|
||||
import { notificationJob } from './jobs/notification';
|
||||
import { sessionsJob } from './jobs/sessions';
|
||||
import { logger } from './utils/logger';
|
||||
|
||||
const workerOptions: WorkerOptions = {
|
||||
connection: getRedisQueue(),
|
||||
concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10),
|
||||
};
|
||||
|
||||
export async function bootWorkers() {
|
||||
const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions);
|
||||
const sessionsWorker = new Worker(
|
||||
sessionsQueue.name,
|
||||
sessionsJob,
|
||||
workerOptions,
|
||||
);
|
||||
const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions);
|
||||
const notificationWorker = new Worker(
|
||||
notificationQueue.name,
|
||||
notificationJob,
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
const workers = [
|
||||
sessionsWorker,
|
||||
eventsWorker,
|
||||
cronWorker,
|
||||
notificationWorker,
|
||||
];
|
||||
|
||||
workers.forEach((worker) => {
|
||||
worker.on('error', (error) => {
|
||||
logger.error('worker error', {
|
||||
worker: worker.name,
|
||||
error,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('closed', () => {
|
||||
logger.info('worker closed', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('ready', () => {
|
||||
logger.info('worker ready', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('failed', (job) => {
|
||||
if (job) {
|
||||
logger.error('job failed', {
|
||||
worker: worker.name,
|
||||
data: job.data,
|
||||
error: job.failedReason,
|
||||
options: job.opts,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
if (job) {
|
||||
logger.info('job completed', {
|
||||
worker: worker.name,
|
||||
data: job.data,
|
||||
elapsed:
|
||||
job.processedOn && job.finishedOn
|
||||
? job.finishedOn - job.processedOn
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
worker.on('ioredis:close', () => {
|
||||
logger.error('worker closed due to ioredis:close', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
async function exitHandler(
|
||||
eventName: string,
|
||||
evtOrExitCodeOrError: number | string | Error,
|
||||
) {
|
||||
logger.info('Starting graceful shutdown', {
|
||||
code: evtOrExitCodeOrError,
|
||||
eventName,
|
||||
});
|
||||
try {
|
||||
await Promise.all([
|
||||
cronWorker.close(),
|
||||
eventsWorker.close(),
|
||||
sessionsWorker.close(),
|
||||
notificationWorker.close(),
|
||||
]);
|
||||
logger.info('workers closed successfully');
|
||||
} catch (e) {
|
||||
logger.error('exit handler error', {
|
||||
code: evtOrExitCodeOrError,
|
||||
error: e,
|
||||
});
|
||||
}
|
||||
const exitCode = Number.isNaN(+evtOrExitCodeOrError)
|
||||
? 1
|
||||
: +evtOrExitCodeOrError;
|
||||
process.exit(exitCode);
|
||||
}
|
||||
|
||||
['uncaughtException', 'unhandledRejection', 'SIGTERM'].forEach((evt) =>
|
||||
process.on(evt, (code) => {
|
||||
exitHandler(evt, code);
|
||||
}),
|
||||
);
|
||||
|
||||
return workers;
|
||||
}
|
||||
@@ -1,24 +1,18 @@
|
||||
import { createBullBoard } from '@bull-board/api';
|
||||
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
|
||||
import { ExpressAdapter } from '@bull-board/express';
|
||||
import type { WorkerOptions } from 'bullmq';
|
||||
import { Worker } from 'bullmq';
|
||||
import express from 'express';
|
||||
|
||||
import { createInitialSalts } from '@openpanel/db';
|
||||
import type { CronQueueType } from '@openpanel/queue';
|
||||
import {
|
||||
cronQueue,
|
||||
eventsQueue,
|
||||
notificationQueue,
|
||||
sessionsQueue,
|
||||
} from '@openpanel/queue';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
|
||||
import { cronJob } from './jobs/cron';
|
||||
import { eventsJob } from './jobs/events';
|
||||
import { notificationJob } from './jobs/notification';
|
||||
import { sessionsJob } from './jobs/sessions';
|
||||
import { bootCron } from './boot-cron';
|
||||
import { bootWorkers } from './boot-workers';
|
||||
import { register } from './metrics';
|
||||
import { logger } from './utils/logger';
|
||||
|
||||
@@ -27,25 +21,7 @@ const serverAdapter = new ExpressAdapter();
|
||||
serverAdapter.setBasePath('/');
|
||||
const app = express();
|
||||
|
||||
const workerOptions: WorkerOptions = {
|
||||
connection: getRedisQueue(),
|
||||
concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10),
|
||||
};
|
||||
|
||||
async function start() {
|
||||
const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions);
|
||||
const sessionsWorker = new Worker(
|
||||
sessionsQueue.name,
|
||||
sessionsJob,
|
||||
workerOptions,
|
||||
);
|
||||
const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions);
|
||||
const notificationWorker = new Worker(
|
||||
notificationQueue.name,
|
||||
notificationJob,
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
createBullBoard({
|
||||
queues: [
|
||||
new BullMQAdapter(eventsQueue),
|
||||
@@ -74,165 +50,11 @@ async function start() {
|
||||
console.log(`For the UI, open http://localhost:${PORT}/`);
|
||||
});
|
||||
|
||||
const workers = [
|
||||
sessionsWorker,
|
||||
eventsWorker,
|
||||
cronWorker,
|
||||
notificationWorker,
|
||||
];
|
||||
workers.forEach((worker) => {
|
||||
worker.on('error', (error) => {
|
||||
logger.error('worker error', {
|
||||
worker: worker.name,
|
||||
error,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('closed', () => {
|
||||
logger.info('worker closed', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('ready', () => {
|
||||
logger.info('worker ready', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('failed', (job) => {
|
||||
if (job) {
|
||||
logger.error('job failed', {
|
||||
worker: worker.name,
|
||||
data: job.data,
|
||||
error: job.failedReason,
|
||||
options: job.opts,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
if (job) {
|
||||
logger.info('job completed', {
|
||||
worker: worker.name,
|
||||
data: job.data,
|
||||
elapsed:
|
||||
job.processedOn && job.finishedOn
|
||||
? job.finishedOn - job.processedOn
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
worker.on('ioredis:close', () => {
|
||||
logger.error('worker closed due to ioredis:close', {
|
||||
worker: worker.name,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
async function exitHandler(
|
||||
eventName: string,
|
||||
evtOrExitCodeOrError: number | string | Error,
|
||||
) {
|
||||
logger.info('Starting graceful shutdown', {
|
||||
code: evtOrExitCodeOrError,
|
||||
eventName,
|
||||
});
|
||||
try {
|
||||
await Promise.all([
|
||||
cronWorker.close(),
|
||||
eventsWorker.close(),
|
||||
sessionsWorker.close(),
|
||||
notificationWorker.close(),
|
||||
]);
|
||||
logger.info('workers closed successfully');
|
||||
} catch (e) {
|
||||
logger.error('exit handler error', {
|
||||
code: evtOrExitCodeOrError,
|
||||
error: e,
|
||||
});
|
||||
}
|
||||
const exitCode = Number.isNaN(+evtOrExitCodeOrError)
|
||||
? 1
|
||||
: +evtOrExitCodeOrError;
|
||||
process.exit(exitCode);
|
||||
}
|
||||
|
||||
['uncaughtException', 'unhandledRejection', 'SIGTERM'].forEach((evt) =>
|
||||
process.on(evt, (code) => {
|
||||
exitHandler(evt, code);
|
||||
}),
|
||||
);
|
||||
|
||||
const jobs: {
|
||||
name: string;
|
||||
type: CronQueueType;
|
||||
pattern: string | number;
|
||||
}[] = [
|
||||
{
|
||||
name: 'salt',
|
||||
type: 'salt',
|
||||
pattern: '0 0 * * *',
|
||||
},
|
||||
{
|
||||
name: 'flush',
|
||||
type: 'flushEvents',
|
||||
pattern: 1000 * 10,
|
||||
},
|
||||
{
|
||||
name: 'flush',
|
||||
type: 'flushProfiles',
|
||||
pattern: 1000 * 60,
|
||||
},
|
||||
];
|
||||
|
||||
if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') {
|
||||
jobs.push({
|
||||
name: 'ping',
|
||||
type: 'ping',
|
||||
pattern: '0 0 * * *',
|
||||
});
|
||||
}
|
||||
|
||||
// Add repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await cronQueue.add(
|
||||
job.name,
|
||||
{
|
||||
type: job.type,
|
||||
payload: undefined,
|
||||
},
|
||||
{
|
||||
jobId: job.type,
|
||||
repeat:
|
||||
typeof job.pattern === 'number'
|
||||
? {
|
||||
every: job.pattern,
|
||||
}
|
||||
: {
|
||||
pattern: job.pattern,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Remove outdated repeatable jobs
|
||||
const repeatableJobs = await cronQueue.getRepeatableJobs();
|
||||
for (const repeatableJob of repeatableJobs) {
|
||||
const match = jobs.find(
|
||||
(job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key,
|
||||
);
|
||||
if (match) {
|
||||
logger.info('Repeatable job exists', {
|
||||
key: repeatableJob.key,
|
||||
});
|
||||
} else {
|
||||
logger.info('Removing repeatable job', {
|
||||
key: repeatableJob.key,
|
||||
});
|
||||
cronQueue.removeRepeatableByKey(repeatableJob.key);
|
||||
}
|
||||
if (process.env.DISABLE_WORKERS === undefined) {
|
||||
await bootWorkers();
|
||||
await bootCron();
|
||||
} else {
|
||||
logger.warn('Workers are disabled');
|
||||
}
|
||||
|
||||
await createInitialSalts();
|
||||
|
||||
Reference in New Issue
Block a user