diff --git a/apps/worker/package.json b/apps/worker/package.json index d86b3950..3721ea19 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -23,6 +23,7 @@ "express": "^4.18.2", "pino": "^8.17.2", "pino-pretty": "^10.3.1", + "prom-client": "^15.1.3", "ramda": "^0.29.1", "sqlstring": "^2.3.3", "ua-parser-js": "^1.0.37", diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 354c7d0b..19a29dea 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -15,6 +15,7 @@ import { import { cronJob } from './jobs/cron'; import { eventsJob } from './jobs/events'; import { sessionsJob } from './jobs/sessions'; +import { register } from './metrics'; const PORT = parseInt(process.env.WORKER_PORT || '3000', 10); const serverAdapter = new ExpressAdapter(); @@ -51,6 +52,18 @@ async function start() { app.use('/', serverAdapter.getRouter()); + app.get('/metrics', (req, res) => { + res.set('Content-Type', register.contentType); + register + .metrics() + .then((metrics) => { + res.end(metrics); + }) + .catch((error) => { + res.status(500).end(error); + }); + }); + app.listen(PORT, () => { console.log(`For the UI, open http://localhost:${PORT}/`); }); diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts new file mode 100644 index 00000000..d75f271a --- /dev/null +++ b/apps/worker/src/metrics.ts @@ -0,0 +1,94 @@ +import client from 'prom-client'; + +import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; +import { redis } from '@openpanel/redis'; + +const Registry = client.Registry; + +export const register = new Registry(); + +const queues = [eventsQueue, sessionsQueue, cronQueue]; + +queues.forEach((queue) => { + register.registerMetric( + new client.Gauge({ + name: `${queue.name}_active_count`, + help: 'Active count', + async collect() { + const metric = await queue.getActiveCount(); + this.set(metric); + }, + }) + ); + + register.registerMetric( + new client.Gauge({ + name: `${queue.name}_delayed_count`, + help: 'Delayed count', + async collect() { + const metric = await queue.getDelayedCount(); + this.set(metric); + }, + }) + ); + + register.registerMetric( + new client.Gauge({ + name: `${queue.name}_failed_count`, + help: 'Failed count', + async collect() { + const metric = await queue.getFailedCount(); + this.set(metric); + }, + }) + ); + + register.registerMetric( + new client.Gauge({ + name: `${queue.name}_completed_count`, + help: 'Completed count', + async collect() { + const metric = await queue.getCompletedCount(); + this.set(metric); + }, + }) + ); + + register.registerMetric( + new client.Gauge({ + name: `${queue.name}_waiting_count`, + help: 'Waiting count', + async collect() { + const metric = await queue.getWaitingCount(); + this.set(metric); + }, + }) + ); +}); + +// Buffer +const buffers = ['events_v2', 'profiles']; + +buffers.forEach((buffer) => { + register.registerMetric( + new client.Gauge({ + name: `buffer_${buffer}_count`, + help: 'Number of users in the users array', + async collect() { + const metric = await redis.llen(`op:buffer:${buffer}`); + this.set(metric); + }, + }) + ); + + register.registerMetric( + new client.Gauge({ + name: `buffer_${buffer}_stalled_count`, + help: 'Number of users in the users array', + async collect() { + const metric = await redis.llen(`op:buffer:${buffer}:stalled`); + this.set(metric); + }, + }) + ); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6533d71d..b1c451f1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -761,6 +761,9 @@ importers: pino-pretty: specifier: ^10.3.1 version: 10.3.1 + prom-client: + specifier: ^15.1.3 + version: 15.1.3 ramda: specifier: ^0.29.1 version: 0.29.1 @@ -15670,6 +15673,14 @@ packages: tdigest: 0.1.2 dev: false + /prom-client@15.1.3: + resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==} + engines: {node: ^16 || ^18 || >=20} + dependencies: + '@opentelemetry/api': 1.8.0 + tdigest: 0.1.2 + dev: false + /promise-inflight@1.0.1: resolution: {integrity: sha512-6zWPyEOFaQBJYcGMHBKTKJ3u6TBsnMFOIZSa6ce1e/ZrrsOlnHRHbabMjLiBYKp+n44X9eUI6VUPaukCXHuG4g==} peerDependencies: