diff --git a/apps/api/package.json b/apps/api/package.json index 35593bae..e073b1b9 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -33,6 +33,7 @@ "ramda": "^0.29.1", "request-ip": "^3.3.0", "sharp": "^0.33.2", + "source-map-support": "^0.5.21", "sqlstring": "^2.3.3", "superjson": "^1.13.3", "svix": "^1.24.0", @@ -48,6 +49,7 @@ "@types/jsonwebtoken": "^9.0.6", "@types/ramda": "^0.29.6", "@types/request-ip": "^0.0.41", + "@types/source-map-support": "^0.5.10", "@types/sqlstring": "^2.3.2", "@types/uuid": "^9.0.8", "@types/ws": "^8.5.10", diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 11811e54..a2296401 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -16,6 +16,7 @@ import { getRedisPub } from '@openpanel/redis'; import type { AppRouter } from '@openpanel/trpc'; import { appRouter, createContext } from '@openpanel/trpc'; +import sourceMapSupport from 'source-map-support'; import { healthcheck, healthcheckQueue, @@ -33,6 +34,8 @@ import trackRouter from './routes/track.router'; import webhookRouter from './routes/webhook.router'; import { logger } from './utils/logger'; +sourceMapSupport.install(); + declare module 'fastify' { interface FastifyRequest { projectId: string; diff --git a/apps/worker/Dockerfile b/apps/worker/Dockerfile index 0f6895b2..ff925b58 100644 --- a/apps/worker/Dockerfile +++ b/apps/worker/Dockerfile @@ -78,4 +78,4 @@ WORKDIR /app/apps/worker EXPOSE 3000 -CMD ["pnpm", "start"] \ No newline at end of file +CMD ["node", "dist/index.js"] \ No newline at end of file diff --git a/apps/worker/package.json b/apps/worker/package.json index fdb6ae33..e56d53c2 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -21,6 +21,7 @@ "express": "^4.18.2", "prom-client": "^15.1.3", "ramda": "^0.29.1", + "source-map-support": "^0.5.21", "sqlstring": "^2.3.3", "uuid": "^9.0.1" }, @@ -28,6 +29,7 @@ "@openpanel/tsconfig": "workspace:*", "@types/express": "^4.17.21", "@types/ramda": "^0.29.6", + "@types/source-map-support": "^0.5.10", "@types/sqlstring": "^2.3.2", "@types/uuid": "^9.0.8", "tsup": "^7.2.0", diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 4289b443..66f389ec 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -1,4 +1,4 @@ -import type { WorkerOptions } from 'bullmq'; +import type { Queue, WorkerOptions } from 'bullmq'; import { Worker } from 'bullmq'; import { @@ -9,6 +9,8 @@ import { } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; +import { performance } from 'node:perf_hooks'; +import { setTimeout as sleep } from 'node:timers/promises'; import { cronJob } from './jobs/cron'; import { eventsJob } from './jobs/events'; import { notificationJob } from './jobs/notification'; @@ -101,13 +103,17 @@ export async function bootWorkers() { eventName, }); try { + const time = performance.now(); + await waitForQueueToEmpty(cronQueue); await Promise.all([ cronWorker.close(), eventsWorker.close(), sessionsWorker.close(), notificationWorker.close(), ]); - logger.info('workers closed successfully'); + logger.info('workers closed successfully', { + elapsed: performance.now() - time, + }); } catch (e) { logger.error('exit handler error', { code: evtOrExitCodeOrError, @@ -120,11 +126,39 @@ export async function bootWorkers() { process.exit(exitCode); } - ['uncaughtException', 'unhandledRejection', 'SIGTERM'].forEach((evt) => - process.on(evt, (code) => { - exitHandler(evt, code); - }), + ['uncaughtException', 'unhandledRejection', 'SIGTERM', 'SIGINT'].forEach( + (evt) => { + process.on(evt, (code) => { + exitHandler(evt, code); + }); + }, ); return workers; } + +export async function waitForQueueToEmpty(queue: Queue, timeout = 60_000) { + const startTime = performance.now(); + + while (true) { + const activeCount = await queue.getActiveCount(); + + if (activeCount === 0) { + break; + } + + if (performance.now() - startTime > timeout) { + logger.warn('Timeout reached while waiting for queue to empty', { + queue: queue.name, + remainingCount: activeCount, + }); + break; + } + + logger.info('Waiting for queue to finish', { + queue: queue.name, + count: activeCount, + }); + await sleep(500); + } +} diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index e0be5cd9..48c78b85 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -11,11 +11,14 @@ import { sessionsQueue, } from '@openpanel/queue'; +import sourceMapSupport from 'source-map-support'; import { bootCron } from './boot-cron'; import { bootWorkers } from './boot-workers'; import { register } from './metrics'; import { logger } from './utils/logger'; +sourceMapSupport.install(); + async function start() { const PORT = Number.parseInt(process.env.WORKER_PORT || '3000', 10); const app = express(); diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 781e6210..e741342f 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -20,7 +20,6 @@ export class RedisBuffer { private lockKey: string; protected maxBufferSize: number | null; protected logger: ILogger; - private isCurrentlyFlushing = false; constructor(bufferName: string, maxBufferSize: number | null) { this.bufferKey = bufferName; @@ -58,11 +57,6 @@ export class RedisBuffer { } public async tryFlush(): Promise { - if (this.isCurrentlyFlushing) { - this.logger.debug('Already flushing. Skipping additional flush attempt.'); - return; - } - const lockId = uuidv4(); const acquired = await getRedisCache().set( this.lockKey, @@ -73,18 +67,16 @@ export class RedisBuffer { ); if (acquired === 'OK') { - this.logger.debug('Lock acquired. Attempting to flush.'); - this.isCurrentlyFlushing = true; + this.logger.info('Lock acquired. Attempting to flush.'); try { await this.flush(); } catch (error) { this.logger.error('Failed to flush buffer', { error }); } finally { - this.isCurrentlyFlushing = false; await this.releaseLock(lockId); } } else { - this.logger.debug('Failed to acquire lock. Skipping flush.'); + this.logger.warn('Failed to acquire lock. Skipping flush.'); } } diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 9ed00c2e..b8083ee9 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -48,14 +48,32 @@ const cleanQuery = (query?: string) => ? query.replace(/\n/g, '').replace(/\s+/g, ' ').trim() : undefined; +const createChildLogger = (property: string, args?: any[]) => { + if (property === 'insert') { + return logger.child({ + property, + table: args?.[0]?.table, + values: (args?.[0]?.values || []).length, + }); + } + + return logger.child({ + property, + table: args?.[0]?.table, + query: cleanQuery(args?.[0]?.query), + }); +}; + export const ch = new Proxy(originalCh, { get(target, property, receiver) { if (property === 'insert' || property === 'query') { return async (...args: any[]) => { - const childLogger = logger.child({ - query: cleanQuery(args[0].query), - property, - }); + const childLogger = createChildLogger(property, args); + + if (property === 'insert') { + childLogger.info('insert info'); + } + try { // First attempt if (property in target) { @@ -69,23 +87,23 @@ export const ch = new Proxy(originalCh, { error.message.includes('socket hang up') || error.message.includes('Timeout error')) ) { - childLogger.error('Captured error', { + childLogger.error('First failed attempt', { error, }); await new Promise((resolve) => setTimeout(resolve, 500)); try { // Retry once - childLogger.info('Retrying query'); + childLogger.info(`Retrying ${property}`); if (property in target) { // @ts-expect-error return await target[property](...args); } } catch (retryError) { - logger.error('Retry failed', retryError); + childLogger.error('Second failed attempt', retryError); throw retryError; // Rethrow or handle as needed } } else { - logger.error('query failed', { + childLogger.error('Failed without retry', { ...args[0], error, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11de0ef9..685d17db 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -96,6 +96,9 @@ importers: sharp: specifier: ^0.33.2 version: 0.33.2 + source-map-support: + specifier: ^0.5.21 + version: 0.5.21 sqlstring: specifier: ^2.3.3 version: 2.3.3 @@ -136,6 +139,9 @@ importers: '@types/request-ip': specifier: ^0.0.41 version: 0.0.41 + '@types/source-map-support': + specifier: ^0.5.10 + version: 0.5.10 '@types/sqlstring': specifier: ^2.3.2 version: 2.3.2 @@ -657,6 +663,9 @@ importers: ramda: specifier: ^0.29.1 version: 0.29.1 + source-map-support: + specifier: ^0.5.21 + version: 0.5.21 sqlstring: specifier: ^2.3.3 version: 2.3.3 @@ -673,6 +682,9 @@ importers: '@types/ramda': specifier: ^0.29.6 version: 0.29.10 + '@types/source-map-support': + specifier: ^0.5.10 + version: 0.5.10 '@types/sqlstring': specifier: ^2.3.2 version: 2.3.2 @@ -10094,6 +10106,12 @@ packages: resolution: {integrity: sha512-UE7oxhQLLd9gub6JKIAhDq06T0F6FnztwMNRvYgjeQSBeMc1ZG/tA47EwfduvkuQS8apbkM/lpLpWsaCeYsXVg==} dev: false + /@types/source-map-support@0.5.10: + resolution: {integrity: sha512-tgVP2H469x9zq34Z0m/fgPewGhg/MLClalNOiPIzQlXrSS2YrKu/xCdSCKnEDwkFha51VKEKB6A9wW26/ZNwzA==} + dependencies: + source-map: 0.6.1 + dev: true + /@types/sqlstring@2.3.2: resolution: {integrity: sha512-lVRe4Iz9UNgiHelKVo8QlC8fb5nfY8+p+jNQNE+UVsuuVlQnWhyWmQ/wF5pE8Ys6TdjfVpqTG9O9i2vi6E0+Sg==} dev: true @@ -19996,7 +20014,6 @@ packages: /source-map@0.6.1: resolution: {integrity: sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==} engines: {node: '>=0.10.0'} - dev: false /source-map@0.7.4: resolution: {integrity: sha512-l3BikUxvPOcn5E74dZiq5BGsTb5yEwhaTSzccU6t4sDOH8NWJCstKO5QT2CvtFoK6F0saL7p9xHAqHOlCPJygA==}