feat: graceful shutdown (#205)

* feat: graceful shutdown

* comments by coderabbit

* fix
This commit is contained in:
Carl-Gerhard Lindesvärd
2025-10-02 11:03:54 +02:00
committed by GitHub
parent 5092b6ae51
commit ca4a880acd
6 changed files with 221 additions and 380 deletions

View File

@@ -1,83 +1,60 @@
import { round } from '@openpanel/common';
import { TABLE_NAMES, chQuery, db } from '@openpanel/db';
import { eventsQueue } from '@openpanel/queue';
import { isShuttingDown } from '@/utils/graceful-shutdown';
import { chQuery, db } from '@openpanel/db';
import { getRedisCache } from '@openpanel/redis';
import type { FastifyReply, FastifyRequest } from 'fastify';
async function withTimings<T>(promise: Promise<T>) {
const time = performance.now();
try {
const data = await promise;
return {
time: round(performance.now() - time, 2),
data,
} as const;
} catch (e) {
return null;
}
}
// For docker compose healthcheck
export async function healthcheck(
request: FastifyRequest,
reply: FastifyReply,
) {
if (process.env.DISABLE_HEALTHCHECK) {
return reply.status(200).send({
ok: true,
});
}
const redisRes = await withTimings(getRedisCache().ping());
const dbRes = await withTimings(db.project.findFirst());
const queueRes = await withTimings(eventsQueue.getCompleted());
const chRes = await withTimings(
chQuery(
`SELECT * FROM ${TABLE_NAMES.events} WHERE created_at > now() - INTERVAL 10 MINUTE LIMIT 1`,
),
);
const status = redisRes && dbRes && queueRes && chRes ? 200 : 500;
try {
const redisRes = await getRedisCache().ping();
const dbRes = await db.project.findFirst();
const chRes = await chQuery('SELECT 1');
const status = redisRes && dbRes && chRes ? 200 : 503;
reply.status(status).send({
redis: redisRes
? {
ok: redisRes.data === 'PONG',
time: `${redisRes.time}ms`,
}
: null,
db: dbRes
? {
ok: !!dbRes.data,
time: `${dbRes.time}ms`,
}
: null,
queue: queueRes
? {
ok: !!queueRes.data,
time: `${queueRes.time}ms`,
}
: null,
ch: chRes
? {
ok: !!chRes.data,
time: `${chRes.time}ms`,
}
: null,
});
}
export async function healthcheckQueue(
request: FastifyRequest,
reply: FastifyReply,
) {
const count = await eventsQueue.getWaitingCount();
if (count > 40) {
reply.status(500).send({
ok: false,
count,
reply.status(status).send({
ready: status === 200,
redis: redisRes === 'PONG',
db: !!dbRes,
ch: chRes && chRes.length > 0,
});
} else {
reply.status(200).send({
ok: true,
count,
} catch (error) {
return reply.status(503).send({
ready: false,
reason: 'dependencies not ready',
});
}
}
// Kubernetes - Liveness probe - returns 200 if process is alive
export async function liveness(request: FastifyRequest, reply: FastifyReply) {
return reply.status(200).send({ live: true });
}
// Kubernetes - Readiness probe - returns 200 only when accepting requests, 503 during shutdown
export async function readiness(request: FastifyRequest, reply: FastifyReply) {
if (isShuttingDown()) {
return reply.status(503).send({ ready: false, reason: 'shutting down' });
}
// Perform lightweight dependency checks for readiness
const redisRes = await getRedisCache().ping();
const dbRes = await db.project.findFirst();
const chRes = await chQuery('SELECT 1');
const isReady = redisRes && dbRes && chRes;
if (!isReady) {
return reply.status(503).send({
ready: false,
reason: 'dependencies not ready',
redis: redisRes === 'PONG',
db: !!dbRes,
ch: chRes && chRes.length > 0,
});
}
return reply.status(200).send({ ready: true });
}

View File

@@ -1,7 +1,7 @@
import type { FastifyReply, FastifyRequest } from 'fastify';
import { path, pick } from 'ramda';
const ignoreLog = ['/healthcheck', '/metrics', '/misc'];
const ignoreLog = ['/healthcheck', '/healthz', '/metrics', '/misc'];
const ignoreMethods = ['OPTIONS'];
const getTrpcInput = (

View File

@@ -21,7 +21,8 @@ import {
import sourceMapSupport from 'source-map-support';
import {
healthcheck,
healthcheckQueue,
liveness,
readiness,
} from './controllers/healthcheck.controller';
import { fixHook } from './hooks/fix.hook';
import { ipHook } from './hooks/ip.hook';
@@ -40,6 +41,7 @@ import profileRouter from './routes/profile.router';
import trackRouter from './routes/track.router';
import webhookRouter from './routes/webhook.router';
import { HttpError } from './utils/errors';
import { shutdown } from './utils/graceful-shutdown';
import { logger } from './utils/logger';
sourceMapSupport.install();
@@ -172,8 +174,11 @@ const startServer = async () => {
instance.register(importRouter, { prefix: '/import' });
instance.register(insightsRouter, { prefix: '/insights' });
instance.register(trackRouter, { prefix: '/track' });
// Keep existing endpoints for backward compatibility
instance.get('/healthcheck', healthcheck);
instance.get('/healthcheck/queue', healthcheckQueue);
// New Kubernetes-style health endpoints
instance.get('/healthz/live', liveness);
instance.get('/healthz/ready', readiness);
instance.get('/', (_request, reply) =>
reply.send({ name: 'openpanel sdk api' }),
);
@@ -211,14 +216,17 @@ const startServer = async () => {
});
if (process.env.NODE_ENV === 'production') {
for (const signal of ['SIGINT', 'SIGTERM']) {
process.on(signal, (error) => {
logger.error(`uncaught exception detected ${signal}`, error);
fastify.close().then((error) => {
process.exit(error ? 1 : 0);
});
});
}
logger.info('Registering graceful shutdown handlers');
process.on('SIGTERM', async () => await shutdown(fastify, 'SIGTERM', 0));
process.on('SIGINT', async () => await shutdown(fastify, 'SIGINT', 0));
process.on('uncaughtException', async (error) => {
logger.error('Uncaught exception', error);
await shutdown(fastify, 'uncaughtException', 1);
});
process.on('unhandledRejection', async (reason, promise) => {
logger.error('Unhandled rejection', { reason, promise });
await shutdown(fastify, 'unhandledRejection', 1);
});
}
await fastify.listen({

View File

@@ -0,0 +1,108 @@
import { ch, db } from '@openpanel/db';
import {
cronQueue,
eventsQueue,
miscQueue,
notificationQueue,
sessionsQueue,
} from '@openpanel/queue';
import {
getRedisCache,
getRedisPub,
getRedisQueue,
getRedisSub,
} from '@openpanel/redis';
import type { FastifyInstance } from 'fastify';
import { logger } from './logger';
let shuttingDown = false;
export function setShuttingDown(value: boolean) {
shuttingDown = value;
}
export function isShuttingDown() {
return shuttingDown;
}
// Graceful shutdown handler
export async function shutdown(
fastify: FastifyInstance,
signal: string,
exitCode = 0,
) {
if (isShuttingDown()) {
logger.warn('Shutdown already in progress, ignoring signal', { signal });
return;
}
logger.info('Starting graceful shutdown', { signal });
setShuttingDown(true);
// Step 2: Wait for load balancer to stop sending traffic (matches preStop sleep)
const gracePeriod = Number(process.env.SHUTDOWN_GRACE_PERIOD_MS || '5000');
await new Promise((resolve) => setTimeout(resolve, gracePeriod));
// Step 3: Close Fastify to drain in-flight requests
try {
await fastify.close();
logger.info('Fastify server closed');
} catch (error) {
logger.error('Error closing Fastify server', error);
}
// Step 4: Close database connections
try {
await db.$disconnect();
logger.info('Database connection closed');
} catch (error) {
logger.error('Error closing database connection', error);
}
// Step 5: Close ClickHouse connections
try {
await ch.close();
logger.info('ClickHouse connections closed');
} catch (error) {
logger.error('Error closing ClickHouse connections', error);
}
// Step 6: Close Bull queues (graceful shutdown of queue state)
try {
await Promise.all([
eventsQueue.close(),
sessionsQueue.close(),
cronQueue.close(),
miscQueue.close(),
notificationQueue.close(),
]);
logger.info('Queue state closed');
} catch (error) {
logger.error('Error closing queue state', error);
}
// Step 7: Close Redis connections
try {
const redisConnections = [
getRedisCache(),
getRedisPub(),
getRedisSub(),
getRedisQueue(),
];
await Promise.all(
redisConnections.map(async (redis) => {
if (redis.status === 'ready') {
await redis.quit();
}
}),
);
logger.info('Redis connections closed');
} catch (error) {
logger.error('Error closing Redis connections', error);
}
logger.info('Graceful shutdown completed');
process.exit(exitCode);
}