From 9ee3d61a25e6a9e08c255635091184d7d73db42d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 11 Nov 2025 21:27:17 +0100 Subject: [PATCH] remove cluster names and add it behind env flag (if someone want to scale) --- apps/api/src/controllers/track.controller.ts | 3 - .../scripts/cleanup-old-event-buffer-keys.ts | 287 ------------------ apps/worker/scripts/migrate-delayed-jobs.ts | 206 ------------- apps/worker/src/boot-cron.ts | 51 ++-- apps/worker/src/boot-workers.ts | 2 +- packages/queue/src/queues.ts | 43 ++- 6 files changed, 48 insertions(+), 544 deletions(-) delete mode 100644 apps/worker/scripts/cleanup-old-event-buffer-keys.ts delete mode 100644 apps/worker/scripts/migrate-delayed-jobs.ts diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index abf08f58..7c1fb441 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -150,7 +150,6 @@ export async function handler( promises.push( track({ - log: request.log.info, payload: request.body.payload, currentDeviceId, previousDeviceId, @@ -217,7 +216,6 @@ async function track({ headers, timestamp, isTimestampFromThePast, - log, }: { payload: TrackPayload; currentDeviceId: string; @@ -227,7 +225,6 @@ async function track({ headers: Record; timestamp: number; isTimestampFromThePast: boolean; - log: any; }) { const uaInfo = parseUserAgent(headers['user-agent'], payload.properties); const groupId = uaInfo.isServer diff --git a/apps/worker/scripts/cleanup-old-event-buffer-keys.ts b/apps/worker/scripts/cleanup-old-event-buffer-keys.ts deleted file mode 100644 index 771079a7..00000000 --- a/apps/worker/scripts/cleanup-old-event-buffer-keys.ts +++ /dev/null @@ -1,287 +0,0 @@ -#!/usr/bin/env tsx -/** - * Cleanup script for old event buffer architecture Redis keys - * - * This script removes Redis keys from the previous complex event buffer implementation: - * - event_buffer:sessions_sorted (sorted set) - * - event_buffer:ready_sessions (sorted set) - * - event_buffer:session:{sessionId} (lists) - * - event_buffer:regular_queue (old queue key, now event_buffer:queue) - * - * The new simplified architecture uses: - * - event_buffer:queue (new queue key) - * - event_buffer:last_screen_view:session:{sessionId} - * - event_buffer:last_screen_view:profile:{projectId}:{profileId} - * - event_buffer:total_count - */ - -import { createLogger } from '@openpanel/logger'; -import { getRedisCache } from '@openpanel/redis'; - -const redis = getRedisCache(); -const logger = createLogger({ name: 'cleanup-event-buffer' }); - -interface CleanupStats { - sessionsSorted: number; - readySessions: number; - sessionLists: number; - regularQueue: number; - totalEventsMigrated: number; - totalKeysDeleted: number; - errors: number; -} - -async function cleanupOldEventBufferKeys(): Promise { - const stats: CleanupStats = { - sessionsSorted: 0, - readySessions: 0, - sessionLists: 0, - regularQueue: 0, - totalEventsMigrated: 0, - totalKeysDeleted: 0, - errors: 0, - }; - - logger.info('Starting cleanup of old event buffer keys...'); - - try { - // 1. Get all session IDs from both sorted sets - const sessionsSortedKey = 'event_buffer:sessions_sorted'; - const readySessionsKey = 'event_buffer:ready_sessions'; - - const [sessionsSortedExists, readySessionsExists] = await Promise.all([ - redis.exists(sessionsSortedKey), - redis.exists(readySessionsKey), - ]); - - let allSessionIds: string[] = []; - - // Collect session IDs from sessions_sorted - if (sessionsSortedExists) { - const sessionIds = await redis.zrange(sessionsSortedKey, 0, -1); - stats.sessionsSorted = sessionIds.length; - allSessionIds = sessionIds; - logger.info(`Found ${sessionIds.length} sessions in sessions_sorted`); - } else { - logger.info(`${sessionsSortedKey} does not exist (already cleaned up)`); - } - - // Also check ready_sessions (might have additional sessions) - if (readySessionsExists) { - const readySessionIds = await redis.zrange(readySessionsKey, 0, -1); - stats.readySessions = readySessionIds.length; - logger.info(`Found ${readySessionIds.length} sessions in ready_sessions`); - - // Merge with allSessionIds (avoid duplicates) - const uniqueReadySessions = readySessionIds.filter( - (id) => !allSessionIds.includes(id), - ); - if (uniqueReadySessions.length > 0) { - logger.info( - `Found ${uniqueReadySessions.length} additional sessions in ready_sessions`, - ); - allSessionIds = [...allSessionIds, ...uniqueReadySessions]; - } - } else { - logger.info(`${readySessionsKey} does not exist (already cleaned up)`); - } - - // 2. Migrate events from session lists to new queue - if (allSessionIds.length > 0) { - logger.info( - `Migrating events from ${allSessionIds.length} session lists to new queue...`, - ); - const newQueueKey = 'event_buffer:queue'; - let totalEventsMigrated = 0; - - // Process in batches - const batchSize = 100; - for (let i = 0; i < allSessionIds.length; i += batchSize) { - const batchIds = allSessionIds.slice(i, i + batchSize); - - for (const sessionId of batchIds) { - const sessionKey = `event_buffer:session:${sessionId}`; - const events = await redis.lrange(sessionKey, 0, -1); - - if (events.length > 0) { - // Move events to new queue in safe batches to avoid exceeding V8 arg limits - const chunkSize = 1000; - for (let offset = 0; offset < events.length; offset = chunkSize) { - const chunk = events.slice(offset, offset + chunkSize); - await redis.rpush(newQueueKey, ...chunk); - } - // Update buffer counter - await redis.incrby('event_buffer:total_count', events.length); - totalEventsMigrated += events.length; - stats.totalEventsMigrated += events.length; - } - - // Delete the session list - await redis.del(sessionKey); - stats.sessionLists++; - stats.totalKeysDeleted++; - } - - logger.info( - `Processed batch ${Math.floor(i / batchSize) + 1}: ${batchIds.length} sessions, ${totalEventsMigrated} total events migrated`, - ); - } - - logger.info( - `✅ Migrated ${totalEventsMigrated} events from session lists to new queue`, - ); - } - - // 3. Delete the sorted sets - const keysToDelete: string[] = []; - if (sessionsSortedExists) keysToDelete.push(sessionsSortedKey); - if (readySessionsExists) keysToDelete.push(readySessionsKey); - - if (keysToDelete.length > 0) { - await redis.del(...keysToDelete); - stats.totalKeysDeleted += keysToDelete.length; - logger.info(`Deleted sorted sets: ${keysToDelete.join(', ')}`); - } - - // 4. Check and handle regular_queue (old queue key) - const regularQueueKey = 'event_buffer:regular_queue'; - const regularQueueExists = await redis.exists(regularQueueKey); - if (regularQueueExists) { - const queueLength = await redis.llen(regularQueueKey); - stats.regularQueue = queueLength; - - if (queueLength > 0) { - logger.info(`Found ${queueLength} events in old regular_queue`); - logger.warn('WARNING: Old regular_queue has pending events!'); - logger.info('Moving events from old queue to new queue...'); - - // Move events from old queue to new queue - const newQueueKey = 'event_buffer:queue'; - let movedCount = 0; - while (true) { - const event = await redis.rpoplpush(regularQueueKey, newQueueKey); - if (!event) break; - movedCount++; - if (movedCount % 1000 === 0) { - logger.info(`Moved ${movedCount} events...`); - } - } - logger.info(`Moved ${movedCount} events from old queue to new queue`); - stats.totalEventsMigrated += movedCount; - } - - // Delete the old queue key - await redis.del(regularQueueKey); - logger.info(`Deleted ${regularQueueKey}`); - stats.totalKeysDeleted++; - } else { - logger.info(`${regularQueueKey} does not exist (already cleaned up)`); - } - - // 5. Scan for any remaining event_buffer:session:* keys that might have been missed - logger.info('Scanning for any remaining session keys...'); - let cursor = '0'; - let remainingSessionKeys = 0; - - do { - const [newCursor, keys] = await redis.scan( - cursor, - 'MATCH', - 'event_buffer:session:*', - 'COUNT', - 100, - ); - cursor = newCursor; - - if (keys.length > 0) { - const deleted = await redis.del(...keys); - remainingSessionKeys += deleted; - stats.totalKeysDeleted += deleted; - logger.info(`Found and deleted ${deleted} remaining session keys`); - } - } while (cursor !== '0'); - - if (remainingSessionKeys > 0) { - logger.info(`Cleaned up ${remainingSessionKeys} remaining session keys`); - } else { - logger.info('No remaining session keys found'); - } - - logger.info('Cleanup completed successfully!', stats); - return stats; - } catch (error) { - stats.errors++; - logger.error('Error during cleanup:', error); - throw error; - } -} - -async function main() { - try { - logger.info('Event Buffer Cleanup Script'); - logger.info('==========================='); - logger.info( - 'This script will remove old event buffer Redis keys from the previous architecture.', - ); - logger.info(''); - - // Check current state - const sessionsSortedExists = await redis.exists( - 'event_buffer:sessions_sorted', - ); - const readySessionsExists = await redis.exists( - 'event_buffer:ready_sessions', - ); - const regularQueueExists = await redis.exists('event_buffer:regular_queue'); - - if (!sessionsSortedExists && !readySessionsExists && !regularQueueExists) { - logger.info( - '✅ No old keys found. System appears to be already cleaned up!', - ); - process.exit(0); - } - - logger.info('Found old keys to clean up:'); - if (sessionsSortedExists) logger.info(' - event_buffer:sessions_sorted ✓'); - if (readySessionsExists) logger.info(' - event_buffer:ready_sessions ✓'); - if (regularQueueExists) logger.info(' - event_buffer:regular_queue ✓'); - logger.info(''); - - // Perform cleanup - const stats = await cleanupOldEventBufferKeys(); - - // Summary - logger.info(''); - logger.info('Cleanup Summary'); - logger.info('==============='); - logger.info(`Sessions sorted entries: ${stats.sessionsSorted}`); - logger.info(`Ready sessions entries: ${stats.readySessions}`); - logger.info(`Session list keys deleted: ${stats.sessionLists}`); - logger.info(`Regular queue events: ${stats.regularQueue} (migrated)`); - logger.info(`Total events migrated: ${stats.totalEventsMigrated}`); - logger.info(`Total keys deleted: ${stats.totalKeysDeleted}`); - logger.info(`Errors: ${stats.errors}`); - logger.info(''); - - if (stats.errors === 0) { - logger.info('✅ Cleanup completed successfully!'); - } else { - logger.warn(`⚠️ Cleanup completed with ${stats.errors} errors`); - } - - // Close Redis connection - await redis.quit(); - process.exit(0); - } catch (error) { - logger.error('Fatal error during cleanup:', error); - await redis.quit(); - process.exit(1); - } -} - -// Run if executed directly -if (require.main === module) { - main(); -} - -export { cleanupOldEventBufferKeys }; diff --git a/apps/worker/scripts/migrate-delayed-jobs.ts b/apps/worker/scripts/migrate-delayed-jobs.ts deleted file mode 100644 index e6609032..00000000 --- a/apps/worker/scripts/migrate-delayed-jobs.ts +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Migration Script: Migrate Delayed Jobs to New Queue Names - * - * This script migrates delayed jobs from old queue names (e.g., "sessions") - * to new queue names with hash tags (e.g., "{sessions}"). - * - * Active/waiting jobs are ignored - only delayed jobs are migrated. - * - * Usage: - * npx tsx apps/worker/scripts/migrate-delayed-jobs.ts - * - * Options: - * --dry-run Show what would be migrated without actually doing it - * --queue Migrate specific queue only (sessions, cron, notification, misc) - * - * # Dry run (recommended first) - * npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --dry-run - * - * Migrate all queues - * npx tsx apps/worker/scripts/migrate-delayed-jobs.ts - * - * Migrate specific queue only - * npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --queue=sessions - * npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --queue=misc - * - */ - -import type { - CronQueuePayload, - MiscQueuePayload, - NotificationQueuePayload, - SessionsQueuePayload, -} from '@openpanel/queue'; -import { getRedisQueue } from '@openpanel/redis'; -import { Queue } from 'bullmq'; - -interface MigrationStats { - queue: string; - total: number; - migrated: number; - failed: number; - skipped: number; -} - -const isDryRun = process.argv.includes('--dry-run'); -const specificQueue = process.argv - .find((arg) => arg.startsWith('--queue=')) - ?.split('=')[1]; - -console.log('🚀 Starting delayed jobs migration'); -console.log( - `Mode: ${isDryRun ? 'DRY RUN (no changes will be made)' : 'LIVE MIGRATION'}`, -); -console.log(`Queue filter: ${specificQueue || 'all queues'}`); -console.log('---\n'); - -async function migrateDelayedJobs( - oldQueueName: string, - newQueueName: string, -): Promise { - const stats: MigrationStats = { - queue: oldQueueName, - total: 0, - migrated: 0, - failed: 0, - skipped: 0, - }; - - const connection = getRedisQueue(); - const oldQueue = new Queue(oldQueueName, { connection }); - const newQueue = new Queue(newQueueName, { connection }); - - try { - console.log(`\n📦 Processing queue: ${oldQueueName} → ${newQueueName}`); - - // Get all delayed jobs from old queue - const delayedJobs = await oldQueue.getDelayed(); - stats.total = delayedJobs.length; - - console.log(` Found ${stats.total} delayed jobs`); - - if (stats.total === 0) { - console.log(' ✓ No delayed jobs to migrate'); - return stats; - } - - for (const job of delayedJobs) { - try { - const delay = job.opts.delay || 0; - const remainingDelay = Math.max(0, job.timestamp + delay - Date.now()); - - console.log( - ` - Job ${job.id}: ${job.name}, delay: ${Math.round(remainingDelay / 1000)}s remaining`, - ); - - if (!isDryRun) { - // Add to new queue with remaining delay - await newQueue.add(job.name || 'migrated-job', job.data, { - ...job.opts, - delay: remainingDelay, - jobId: job.id, // Preserve job ID if possible - attempts: job.opts.attempts, - backoff: job.opts.backoff, - }); - - // Remove from old queue - await job.remove(); - - stats.migrated++; - console.log(' ✓ Migrated'); - } else { - stats.migrated++; - console.log(' ✓ Would migrate (dry run)'); - } - } catch (error) { - stats.failed++; - console.error( - ` ✗ Failed to migrate job ${job.id}:`, - error instanceof Error ? error.message : error, - ); - } - } - - console.log(`\n Summary for ${oldQueueName}:`); - console.log(` - Total: ${stats.total}`); - console.log(` - Migrated: ${stats.migrated}`); - console.log(` - Failed: ${stats.failed}`); - console.log(` - Skipped: ${stats.skipped}`); - } catch (error) { - console.error(` ✗ Error processing queue ${oldQueueName}:`, error); - } finally { - await oldQueue.close(); - await newQueue.close(); - } - - return stats; -} - -async function main() { - const queuesToMigrate: Array<{ old: string; new: string }> = [ - { old: 'sessions', new: '{sessions}' }, - { old: 'misc', new: '{misc}' }, - ]; - - // Filter to specific queue if requested - const filtered = specificQueue - ? queuesToMigrate.filter((q) => q.old === specificQueue) - : queuesToMigrate; - - if (filtered.length === 0) { - console.error( - `❌ Queue "${specificQueue}" not found. Valid queues: sessions, cron, notification, misc`, - ); - process.exit(1); - } - - const allStats: MigrationStats[] = []; - - for (const { old: oldName, new: newName } of filtered) { - const stats = await migrateDelayedJobs(oldName, newName); - allStats.push(stats); - } - - // Print summary - console.log(`\n${'='.repeat(50)}`); - console.log('📊 MIGRATION SUMMARY'); - console.log(`${'='.repeat(50)}\n`); - - let totalJobs = 0; - let totalMigrated = 0; - let totalFailed = 0; - - for (const stats of allStats) { - totalJobs += stats.total; - totalMigrated += stats.migrated; - totalFailed += stats.failed; - } - - console.log(`Total jobs found: ${totalJobs}`); - console.log(`Successfully migrated: ${totalMigrated}`); - console.log(`Failed: ${totalFailed}`); - console.log( - `\n${isDryRun ? '⚠️ This was a DRY RUN - no changes were made' : '✅ Migration complete!'}`, - ); - - if (totalFailed > 0) { - console.log( - '\n⚠️ Some jobs failed to migrate. Check the logs above for details.', - ); - process.exit(1); - } - - if (isDryRun && totalMigrated > 0) { - console.log('\n💡 Run without --dry-run to perform the actual migration'); - } -} - -main() - .then(() => { - console.log('\n✨ Done!'); - process.exit(0); - }) - .catch((error) => { - console.error('\n❌ Migration failed:', error); - process.exit(1); - }); diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 16b7536d..9650598e 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -1,7 +1,6 @@ import type { CronQueueType } from '@openpanel/queue'; import { cronQueue } from '@openpanel/queue'; -import { getLock } from '@openpanel/redis'; import { logger } from './utils/logger'; export async function bootCron() { @@ -45,40 +44,30 @@ export async function bootCron() { }); } - const lock = await getLock('cron:lock', '1', 1000 * 60 * 5); + logger.info('Updating cron jobs'); - if (lock) { - logger.info('Cron lock acquired'); - } else { - logger.info('Cron lock not acquired'); + const jobSchedulers = await cronQueue.getJobSchedulers(); + for (const jobScheduler of jobSchedulers) { + await cronQueue.removeJobScheduler(jobScheduler.key); } - if (lock) { - logger.info('Updating cron jobs'); - // TODO: Switch to getJobSchedulers - const repeatableJobs = await cronQueue.getRepeatableJobs(); - for (const repeatableJob of repeatableJobs) { - await cronQueue.removeRepeatableByKey(repeatableJob.key); - } - - // Add repeatable jobs - for (const job of jobs) { - await cronQueue.upsertJobScheduler( - job.type, - typeof job.pattern === 'number' - ? { - every: job.pattern, - } - : { - pattern: job.pattern, - }, - { - data: { - type: job.type, - payload: undefined, + // Add repeatable jobs + for (const job of jobs) { + await cronQueue.upsertJobScheduler( + job.type, + typeof job.pattern === 'number' + ? { + every: job.pattern, + } + : { + pattern: job.pattern, }, + { + data: { + type: job.type, + payload: undefined, }, - ); - } + }, + ); } } diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 8fff1754..bcce6a80 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -12,7 +12,7 @@ import { queueLogger, sessionsQueue, } from '@openpanel/queue'; -import { getLock, getRedisQueue } from '@openpanel/redis'; +import { getRedisQueue } from '@openpanel/redis'; import { performance } from 'node:perf_hooks'; import { setTimeout as sleep } from 'node:timers/promises'; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 10814de2..0395cc14 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -16,6 +16,9 @@ export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt( 10, ); +export const getQueueName = (name: string) => + process.env.QUEUE_CLUSTER ? `{${name}}` : name; + function pickShard(projectId: string) { const h = createHash('sha1').update(projectId).digest(); // 20 bytes // take first 4 bytes as unsigned int @@ -144,7 +147,9 @@ export const eventsGroupQueues = Array.from({ (_, index) => new GroupQueue({ logger: queueLogger, - namespace: `{group_events_${index}}`, + namespace: getQueueName( + index === 0 ? 'group_events' : `group_events_${index}`, + ), redis: getRedisGroupQueue(), keepCompleted: 1_000, keepFailed: 10_000, @@ -168,24 +173,27 @@ export const getEventsGroupQueueShard = (groupId: string) => { return queue; }; -export const sessionsQueue = new Queue('{sessions}', { - connection: getRedisQueue(), - defaultJobOptions: { - removeOnComplete: 10, +export const sessionsQueue = new Queue( + getQueueName('sessions'), + { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 10, + }, }, -}); -export const sessionsQueueEvents = new QueueEvents('{sessions}', { +); +export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), { connection: getRedisQueue(), }); -export const cronQueue = new Queue('{cron}', { +export const cronQueue = new Queue(getQueueName('cron'), { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }); -export const miscQueue = new Queue('{misc}', { +export const miscQueue = new Queue(getQueueName('misc'), { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, @@ -200,7 +208,7 @@ export type NotificationQueuePayload = { }; export const notificationQueue = new Queue( - '{notification}', + getQueueName('notification'), { connection: getRedisQueue(), defaultJobOptions: { @@ -216,13 +224,16 @@ export type ImportQueuePayload = { }; }; -export const importQueue = new Queue('{import}', { - connection: getRedisQueue(), - defaultJobOptions: { - removeOnComplete: 10, - removeOnFail: 50, +export const importQueue = new Queue( + getQueueName('import'), + { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 10, + removeOnFail: 50, + }, }, -}); +); export function addTrialEndingSoonJob(organizationId: string, delay: number) { return miscQueue.add(