remove cluster names and add it behind env flag (if someone want to scale)
This commit is contained in:
@@ -150,7 +150,6 @@ export async function handler(
|
||||
|
||||
promises.push(
|
||||
track({
|
||||
log: request.log.info,
|
||||
payload: request.body.payload,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
@@ -217,7 +216,6 @@ async function track({
|
||||
headers,
|
||||
timestamp,
|
||||
isTimestampFromThePast,
|
||||
log,
|
||||
}: {
|
||||
payload: TrackPayload;
|
||||
currentDeviceId: string;
|
||||
@@ -227,7 +225,6 @@ async function track({
|
||||
headers: Record<string, string | undefined>;
|
||||
timestamp: number;
|
||||
isTimestampFromThePast: boolean;
|
||||
log: any;
|
||||
}) {
|
||||
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
|
||||
const groupId = uaInfo.isServer
|
||||
|
||||
@@ -1,287 +0,0 @@
|
||||
#!/usr/bin/env tsx
|
||||
/**
|
||||
* Cleanup script for old event buffer architecture Redis keys
|
||||
*
|
||||
* This script removes Redis keys from the previous complex event buffer implementation:
|
||||
* - event_buffer:sessions_sorted (sorted set)
|
||||
* - event_buffer:ready_sessions (sorted set)
|
||||
* - event_buffer:session:{sessionId} (lists)
|
||||
* - event_buffer:regular_queue (old queue key, now event_buffer:queue)
|
||||
*
|
||||
* The new simplified architecture uses:
|
||||
* - event_buffer:queue (new queue key)
|
||||
* - event_buffer:last_screen_view:session:{sessionId}
|
||||
* - event_buffer:last_screen_view:profile:{projectId}:{profileId}
|
||||
* - event_buffer:total_count
|
||||
*/
|
||||
|
||||
import { createLogger } from '@openpanel/logger';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
|
||||
const redis = getRedisCache();
|
||||
const logger = createLogger({ name: 'cleanup-event-buffer' });
|
||||
|
||||
interface CleanupStats {
|
||||
sessionsSorted: number;
|
||||
readySessions: number;
|
||||
sessionLists: number;
|
||||
regularQueue: number;
|
||||
totalEventsMigrated: number;
|
||||
totalKeysDeleted: number;
|
||||
errors: number;
|
||||
}
|
||||
|
||||
async function cleanupOldEventBufferKeys(): Promise<CleanupStats> {
|
||||
const stats: CleanupStats = {
|
||||
sessionsSorted: 0,
|
||||
readySessions: 0,
|
||||
sessionLists: 0,
|
||||
regularQueue: 0,
|
||||
totalEventsMigrated: 0,
|
||||
totalKeysDeleted: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
logger.info('Starting cleanup of old event buffer keys...');
|
||||
|
||||
try {
|
||||
// 1. Get all session IDs from both sorted sets
|
||||
const sessionsSortedKey = 'event_buffer:sessions_sorted';
|
||||
const readySessionsKey = 'event_buffer:ready_sessions';
|
||||
|
||||
const [sessionsSortedExists, readySessionsExists] = await Promise.all([
|
||||
redis.exists(sessionsSortedKey),
|
||||
redis.exists(readySessionsKey),
|
||||
]);
|
||||
|
||||
let allSessionIds: string[] = [];
|
||||
|
||||
// Collect session IDs from sessions_sorted
|
||||
if (sessionsSortedExists) {
|
||||
const sessionIds = await redis.zrange(sessionsSortedKey, 0, -1);
|
||||
stats.sessionsSorted = sessionIds.length;
|
||||
allSessionIds = sessionIds;
|
||||
logger.info(`Found ${sessionIds.length} sessions in sessions_sorted`);
|
||||
} else {
|
||||
logger.info(`${sessionsSortedKey} does not exist (already cleaned up)`);
|
||||
}
|
||||
|
||||
// Also check ready_sessions (might have additional sessions)
|
||||
if (readySessionsExists) {
|
||||
const readySessionIds = await redis.zrange(readySessionsKey, 0, -1);
|
||||
stats.readySessions = readySessionIds.length;
|
||||
logger.info(`Found ${readySessionIds.length} sessions in ready_sessions`);
|
||||
|
||||
// Merge with allSessionIds (avoid duplicates)
|
||||
const uniqueReadySessions = readySessionIds.filter(
|
||||
(id) => !allSessionIds.includes(id),
|
||||
);
|
||||
if (uniqueReadySessions.length > 0) {
|
||||
logger.info(
|
||||
`Found ${uniqueReadySessions.length} additional sessions in ready_sessions`,
|
||||
);
|
||||
allSessionIds = [...allSessionIds, ...uniqueReadySessions];
|
||||
}
|
||||
} else {
|
||||
logger.info(`${readySessionsKey} does not exist (already cleaned up)`);
|
||||
}
|
||||
|
||||
// 2. Migrate events from session lists to new queue
|
||||
if (allSessionIds.length > 0) {
|
||||
logger.info(
|
||||
`Migrating events from ${allSessionIds.length} session lists to new queue...`,
|
||||
);
|
||||
const newQueueKey = 'event_buffer:queue';
|
||||
let totalEventsMigrated = 0;
|
||||
|
||||
// Process in batches
|
||||
const batchSize = 100;
|
||||
for (let i = 0; i < allSessionIds.length; i += batchSize) {
|
||||
const batchIds = allSessionIds.slice(i, i + batchSize);
|
||||
|
||||
for (const sessionId of batchIds) {
|
||||
const sessionKey = `event_buffer:session:${sessionId}`;
|
||||
const events = await redis.lrange(sessionKey, 0, -1);
|
||||
|
||||
if (events.length > 0) {
|
||||
// Move events to new queue in safe batches to avoid exceeding V8 arg limits
|
||||
const chunkSize = 1000;
|
||||
for (let offset = 0; offset < events.length; offset = chunkSize) {
|
||||
const chunk = events.slice(offset, offset + chunkSize);
|
||||
await redis.rpush(newQueueKey, ...chunk);
|
||||
}
|
||||
// Update buffer counter
|
||||
await redis.incrby('event_buffer:total_count', events.length);
|
||||
totalEventsMigrated += events.length;
|
||||
stats.totalEventsMigrated += events.length;
|
||||
}
|
||||
|
||||
// Delete the session list
|
||||
await redis.del(sessionKey);
|
||||
stats.sessionLists++;
|
||||
stats.totalKeysDeleted++;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Processed batch ${Math.floor(i / batchSize) + 1}: ${batchIds.length} sessions, ${totalEventsMigrated} total events migrated`,
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`✅ Migrated ${totalEventsMigrated} events from session lists to new queue`,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Delete the sorted sets
|
||||
const keysToDelete: string[] = [];
|
||||
if (sessionsSortedExists) keysToDelete.push(sessionsSortedKey);
|
||||
if (readySessionsExists) keysToDelete.push(readySessionsKey);
|
||||
|
||||
if (keysToDelete.length > 0) {
|
||||
await redis.del(...keysToDelete);
|
||||
stats.totalKeysDeleted += keysToDelete.length;
|
||||
logger.info(`Deleted sorted sets: ${keysToDelete.join(', ')}`);
|
||||
}
|
||||
|
||||
// 4. Check and handle regular_queue (old queue key)
|
||||
const regularQueueKey = 'event_buffer:regular_queue';
|
||||
const regularQueueExists = await redis.exists(regularQueueKey);
|
||||
if (regularQueueExists) {
|
||||
const queueLength = await redis.llen(regularQueueKey);
|
||||
stats.regularQueue = queueLength;
|
||||
|
||||
if (queueLength > 0) {
|
||||
logger.info(`Found ${queueLength} events in old regular_queue`);
|
||||
logger.warn('WARNING: Old regular_queue has pending events!');
|
||||
logger.info('Moving events from old queue to new queue...');
|
||||
|
||||
// Move events from old queue to new queue
|
||||
const newQueueKey = 'event_buffer:queue';
|
||||
let movedCount = 0;
|
||||
while (true) {
|
||||
const event = await redis.rpoplpush(regularQueueKey, newQueueKey);
|
||||
if (!event) break;
|
||||
movedCount++;
|
||||
if (movedCount % 1000 === 0) {
|
||||
logger.info(`Moved ${movedCount} events...`);
|
||||
}
|
||||
}
|
||||
logger.info(`Moved ${movedCount} events from old queue to new queue`);
|
||||
stats.totalEventsMigrated += movedCount;
|
||||
}
|
||||
|
||||
// Delete the old queue key
|
||||
await redis.del(regularQueueKey);
|
||||
logger.info(`Deleted ${regularQueueKey}`);
|
||||
stats.totalKeysDeleted++;
|
||||
} else {
|
||||
logger.info(`${regularQueueKey} does not exist (already cleaned up)`);
|
||||
}
|
||||
|
||||
// 5. Scan for any remaining event_buffer:session:* keys that might have been missed
|
||||
logger.info('Scanning for any remaining session keys...');
|
||||
let cursor = '0';
|
||||
let remainingSessionKeys = 0;
|
||||
|
||||
do {
|
||||
const [newCursor, keys] = await redis.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
'event_buffer:session:*',
|
||||
'COUNT',
|
||||
100,
|
||||
);
|
||||
cursor = newCursor;
|
||||
|
||||
if (keys.length > 0) {
|
||||
const deleted = await redis.del(...keys);
|
||||
remainingSessionKeys += deleted;
|
||||
stats.totalKeysDeleted += deleted;
|
||||
logger.info(`Found and deleted ${deleted} remaining session keys`);
|
||||
}
|
||||
} while (cursor !== '0');
|
||||
|
||||
if (remainingSessionKeys > 0) {
|
||||
logger.info(`Cleaned up ${remainingSessionKeys} remaining session keys`);
|
||||
} else {
|
||||
logger.info('No remaining session keys found');
|
||||
}
|
||||
|
||||
logger.info('Cleanup completed successfully!', stats);
|
||||
return stats;
|
||||
} catch (error) {
|
||||
stats.errors++;
|
||||
logger.error('Error during cleanup:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
try {
|
||||
logger.info('Event Buffer Cleanup Script');
|
||||
logger.info('===========================');
|
||||
logger.info(
|
||||
'This script will remove old event buffer Redis keys from the previous architecture.',
|
||||
);
|
||||
logger.info('');
|
||||
|
||||
// Check current state
|
||||
const sessionsSortedExists = await redis.exists(
|
||||
'event_buffer:sessions_sorted',
|
||||
);
|
||||
const readySessionsExists = await redis.exists(
|
||||
'event_buffer:ready_sessions',
|
||||
);
|
||||
const regularQueueExists = await redis.exists('event_buffer:regular_queue');
|
||||
|
||||
if (!sessionsSortedExists && !readySessionsExists && !regularQueueExists) {
|
||||
logger.info(
|
||||
'✅ No old keys found. System appears to be already cleaned up!',
|
||||
);
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
logger.info('Found old keys to clean up:');
|
||||
if (sessionsSortedExists) logger.info(' - event_buffer:sessions_sorted ✓');
|
||||
if (readySessionsExists) logger.info(' - event_buffer:ready_sessions ✓');
|
||||
if (regularQueueExists) logger.info(' - event_buffer:regular_queue ✓');
|
||||
logger.info('');
|
||||
|
||||
// Perform cleanup
|
||||
const stats = await cleanupOldEventBufferKeys();
|
||||
|
||||
// Summary
|
||||
logger.info('');
|
||||
logger.info('Cleanup Summary');
|
||||
logger.info('===============');
|
||||
logger.info(`Sessions sorted entries: ${stats.sessionsSorted}`);
|
||||
logger.info(`Ready sessions entries: ${stats.readySessions}`);
|
||||
logger.info(`Session list keys deleted: ${stats.sessionLists}`);
|
||||
logger.info(`Regular queue events: ${stats.regularQueue} (migrated)`);
|
||||
logger.info(`Total events migrated: ${stats.totalEventsMigrated}`);
|
||||
logger.info(`Total keys deleted: ${stats.totalKeysDeleted}`);
|
||||
logger.info(`Errors: ${stats.errors}`);
|
||||
logger.info('');
|
||||
|
||||
if (stats.errors === 0) {
|
||||
logger.info('✅ Cleanup completed successfully!');
|
||||
} else {
|
||||
logger.warn(`⚠️ Cleanup completed with ${stats.errors} errors`);
|
||||
}
|
||||
|
||||
// Close Redis connection
|
||||
await redis.quit();
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
logger.error('Fatal error during cleanup:', error);
|
||||
await redis.quit();
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Run if executed directly
|
||||
if (require.main === module) {
|
||||
main();
|
||||
}
|
||||
|
||||
export { cleanupOldEventBufferKeys };
|
||||
@@ -1,206 +0,0 @@
|
||||
/**
|
||||
* Migration Script: Migrate Delayed Jobs to New Queue Names
|
||||
*
|
||||
* This script migrates delayed jobs from old queue names (e.g., "sessions")
|
||||
* to new queue names with hash tags (e.g., "{sessions}").
|
||||
*
|
||||
* Active/waiting jobs are ignored - only delayed jobs are migrated.
|
||||
*
|
||||
* Usage:
|
||||
* npx tsx apps/worker/scripts/migrate-delayed-jobs.ts
|
||||
*
|
||||
* Options:
|
||||
* --dry-run Show what would be migrated without actually doing it
|
||||
* --queue Migrate specific queue only (sessions, cron, notification, misc)
|
||||
*
|
||||
* # Dry run (recommended first)
|
||||
* npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --dry-run
|
||||
*
|
||||
* Migrate all queues
|
||||
* npx tsx apps/worker/scripts/migrate-delayed-jobs.ts
|
||||
*
|
||||
* Migrate specific queue only
|
||||
* npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --queue=sessions
|
||||
* npx tsx apps/worker/scripts/migrate-delayed-jobs.ts --queue=misc
|
||||
*
|
||||
*/
|
||||
|
||||
import type {
|
||||
CronQueuePayload,
|
||||
MiscQueuePayload,
|
||||
NotificationQueuePayload,
|
||||
SessionsQueuePayload,
|
||||
} from '@openpanel/queue';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
import { Queue } from 'bullmq';
|
||||
|
||||
interface MigrationStats {
|
||||
queue: string;
|
||||
total: number;
|
||||
migrated: number;
|
||||
failed: number;
|
||||
skipped: number;
|
||||
}
|
||||
|
||||
const isDryRun = process.argv.includes('--dry-run');
|
||||
const specificQueue = process.argv
|
||||
.find((arg) => arg.startsWith('--queue='))
|
||||
?.split('=')[1];
|
||||
|
||||
console.log('🚀 Starting delayed jobs migration');
|
||||
console.log(
|
||||
`Mode: ${isDryRun ? 'DRY RUN (no changes will be made)' : 'LIVE MIGRATION'}`,
|
||||
);
|
||||
console.log(`Queue filter: ${specificQueue || 'all queues'}`);
|
||||
console.log('---\n');
|
||||
|
||||
async function migrateDelayedJobs<T>(
|
||||
oldQueueName: string,
|
||||
newQueueName: string,
|
||||
): Promise<MigrationStats> {
|
||||
const stats: MigrationStats = {
|
||||
queue: oldQueueName,
|
||||
total: 0,
|
||||
migrated: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
};
|
||||
|
||||
const connection = getRedisQueue();
|
||||
const oldQueue = new Queue<T>(oldQueueName, { connection });
|
||||
const newQueue = new Queue<T>(newQueueName, { connection });
|
||||
|
||||
try {
|
||||
console.log(`\n📦 Processing queue: ${oldQueueName} → ${newQueueName}`);
|
||||
|
||||
// Get all delayed jobs from old queue
|
||||
const delayedJobs = await oldQueue.getDelayed();
|
||||
stats.total = delayedJobs.length;
|
||||
|
||||
console.log(` Found ${stats.total} delayed jobs`);
|
||||
|
||||
if (stats.total === 0) {
|
||||
console.log(' ✓ No delayed jobs to migrate');
|
||||
return stats;
|
||||
}
|
||||
|
||||
for (const job of delayedJobs) {
|
||||
try {
|
||||
const delay = job.opts.delay || 0;
|
||||
const remainingDelay = Math.max(0, job.timestamp + delay - Date.now());
|
||||
|
||||
console.log(
|
||||
` - Job ${job.id}: ${job.name}, delay: ${Math.round(remainingDelay / 1000)}s remaining`,
|
||||
);
|
||||
|
||||
if (!isDryRun) {
|
||||
// Add to new queue with remaining delay
|
||||
await newQueue.add(job.name || 'migrated-job', job.data, {
|
||||
...job.opts,
|
||||
delay: remainingDelay,
|
||||
jobId: job.id, // Preserve job ID if possible
|
||||
attempts: job.opts.attempts,
|
||||
backoff: job.opts.backoff,
|
||||
});
|
||||
|
||||
// Remove from old queue
|
||||
await job.remove();
|
||||
|
||||
stats.migrated++;
|
||||
console.log(' ✓ Migrated');
|
||||
} else {
|
||||
stats.migrated++;
|
||||
console.log(' ✓ Would migrate (dry run)');
|
||||
}
|
||||
} catch (error) {
|
||||
stats.failed++;
|
||||
console.error(
|
||||
` ✗ Failed to migrate job ${job.id}:`,
|
||||
error instanceof Error ? error.message : error,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`\n Summary for ${oldQueueName}:`);
|
||||
console.log(` - Total: ${stats.total}`);
|
||||
console.log(` - Migrated: ${stats.migrated}`);
|
||||
console.log(` - Failed: ${stats.failed}`);
|
||||
console.log(` - Skipped: ${stats.skipped}`);
|
||||
} catch (error) {
|
||||
console.error(` ✗ Error processing queue ${oldQueueName}:`, error);
|
||||
} finally {
|
||||
await oldQueue.close();
|
||||
await newQueue.close();
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const queuesToMigrate: Array<{ old: string; new: string }> = [
|
||||
{ old: 'sessions', new: '{sessions}' },
|
||||
{ old: 'misc', new: '{misc}' },
|
||||
];
|
||||
|
||||
// Filter to specific queue if requested
|
||||
const filtered = specificQueue
|
||||
? queuesToMigrate.filter((q) => q.old === specificQueue)
|
||||
: queuesToMigrate;
|
||||
|
||||
if (filtered.length === 0) {
|
||||
console.error(
|
||||
`❌ Queue "${specificQueue}" not found. Valid queues: sessions, cron, notification, misc`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const allStats: MigrationStats[] = [];
|
||||
|
||||
for (const { old: oldName, new: newName } of filtered) {
|
||||
const stats = await migrateDelayedJobs(oldName, newName);
|
||||
allStats.push(stats);
|
||||
}
|
||||
|
||||
// Print summary
|
||||
console.log(`\n${'='.repeat(50)}`);
|
||||
console.log('📊 MIGRATION SUMMARY');
|
||||
console.log(`${'='.repeat(50)}\n`);
|
||||
|
||||
let totalJobs = 0;
|
||||
let totalMigrated = 0;
|
||||
let totalFailed = 0;
|
||||
|
||||
for (const stats of allStats) {
|
||||
totalJobs += stats.total;
|
||||
totalMigrated += stats.migrated;
|
||||
totalFailed += stats.failed;
|
||||
}
|
||||
|
||||
console.log(`Total jobs found: ${totalJobs}`);
|
||||
console.log(`Successfully migrated: ${totalMigrated}`);
|
||||
console.log(`Failed: ${totalFailed}`);
|
||||
console.log(
|
||||
`\n${isDryRun ? '⚠️ This was a DRY RUN - no changes were made' : '✅ Migration complete!'}`,
|
||||
);
|
||||
|
||||
if (totalFailed > 0) {
|
||||
console.log(
|
||||
'\n⚠️ Some jobs failed to migrate. Check the logs above for details.',
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (isDryRun && totalMigrated > 0) {
|
||||
console.log('\n💡 Run without --dry-run to perform the actual migration');
|
||||
}
|
||||
}
|
||||
|
||||
main()
|
||||
.then(() => {
|
||||
console.log('\n✨ Done!');
|
||||
process.exit(0);
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error('\n❌ Migration failed:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -1,7 +1,6 @@
|
||||
import type { CronQueueType } from '@openpanel/queue';
|
||||
import { cronQueue } from '@openpanel/queue';
|
||||
|
||||
import { getLock } from '@openpanel/redis';
|
||||
import { logger } from './utils/logger';
|
||||
|
||||
export async function bootCron() {
|
||||
@@ -45,40 +44,30 @@ export async function bootCron() {
|
||||
});
|
||||
}
|
||||
|
||||
const lock = await getLock('cron:lock', '1', 1000 * 60 * 5);
|
||||
logger.info('Updating cron jobs');
|
||||
|
||||
if (lock) {
|
||||
logger.info('Cron lock acquired');
|
||||
} else {
|
||||
logger.info('Cron lock not acquired');
|
||||
const jobSchedulers = await cronQueue.getJobSchedulers();
|
||||
for (const jobScheduler of jobSchedulers) {
|
||||
await cronQueue.removeJobScheduler(jobScheduler.key);
|
||||
}
|
||||
|
||||
if (lock) {
|
||||
logger.info('Updating cron jobs');
|
||||
// TODO: Switch to getJobSchedulers
|
||||
const repeatableJobs = await cronQueue.getRepeatableJobs();
|
||||
for (const repeatableJob of repeatableJobs) {
|
||||
await cronQueue.removeRepeatableByKey(repeatableJob.key);
|
||||
}
|
||||
|
||||
// Add repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await cronQueue.upsertJobScheduler(
|
||||
job.type,
|
||||
typeof job.pattern === 'number'
|
||||
? {
|
||||
every: job.pattern,
|
||||
}
|
||||
: {
|
||||
pattern: job.pattern,
|
||||
},
|
||||
{
|
||||
data: {
|
||||
type: job.type,
|
||||
payload: undefined,
|
||||
// Add repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await cronQueue.upsertJobScheduler(
|
||||
job.type,
|
||||
typeof job.pattern === 'number'
|
||||
? {
|
||||
every: job.pattern,
|
||||
}
|
||||
: {
|
||||
pattern: job.pattern,
|
||||
},
|
||||
{
|
||||
data: {
|
||||
type: job.type,
|
||||
payload: undefined,
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
queueLogger,
|
||||
sessionsQueue,
|
||||
} from '@openpanel/queue';
|
||||
import { getLock, getRedisQueue } from '@openpanel/redis';
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
|
||||
import { performance } from 'node:perf_hooks';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
@@ -16,6 +16,9 @@ export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt(
|
||||
10,
|
||||
);
|
||||
|
||||
export const getQueueName = (name: string) =>
|
||||
process.env.QUEUE_CLUSTER ? `{${name}}` : name;
|
||||
|
||||
function pickShard(projectId: string) {
|
||||
const h = createHash('sha1').update(projectId).digest(); // 20 bytes
|
||||
// take first 4 bytes as unsigned int
|
||||
@@ -144,7 +147,9 @@ export const eventsGroupQueues = Array.from({
|
||||
(_, index) =>
|
||||
new GroupQueue<EventsQueuePayloadIncomingEvent['payload']>({
|
||||
logger: queueLogger,
|
||||
namespace: `{group_events_${index}}`,
|
||||
namespace: getQueueName(
|
||||
index === 0 ? 'group_events' : `group_events_${index}`,
|
||||
),
|
||||
redis: getRedisGroupQueue(),
|
||||
keepCompleted: 1_000,
|
||||
keepFailed: 10_000,
|
||||
@@ -168,24 +173,27 @@ export const getEventsGroupQueueShard = (groupId: string) => {
|
||||
return queue;
|
||||
};
|
||||
|
||||
export const sessionsQueue = new Queue<SessionsQueuePayload>('{sessions}', {
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
export const sessionsQueue = new Queue<SessionsQueuePayload>(
|
||||
getQueueName('sessions'),
|
||||
{
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
},
|
||||
},
|
||||
});
|
||||
export const sessionsQueueEvents = new QueueEvents('{sessions}', {
|
||||
);
|
||||
export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), {
|
||||
connection: getRedisQueue(),
|
||||
});
|
||||
|
||||
export const cronQueue = new Queue<CronQueuePayload>('{cron}', {
|
||||
export const cronQueue = new Queue<CronQueuePayload>(getQueueName('cron'), {
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
},
|
||||
});
|
||||
|
||||
export const miscQueue = new Queue<MiscQueuePayload>('{misc}', {
|
||||
export const miscQueue = new Queue<MiscQueuePayload>(getQueueName('misc'), {
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
@@ -200,7 +208,7 @@ export type NotificationQueuePayload = {
|
||||
};
|
||||
|
||||
export const notificationQueue = new Queue<NotificationQueuePayload>(
|
||||
'{notification}',
|
||||
getQueueName('notification'),
|
||||
{
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
@@ -216,13 +224,16 @@ export type ImportQueuePayload = {
|
||||
};
|
||||
};
|
||||
|
||||
export const importQueue = new Queue<ImportQueuePayload>('{import}', {
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 50,
|
||||
export const importQueue = new Queue<ImportQueuePayload>(
|
||||
getQueueName('import'),
|
||||
{
|
||||
connection: getRedisQueue(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 50,
|
||||
},
|
||||
},
|
||||
});
|
||||
);
|
||||
|
||||
export function addTrialEndingSoonJob(organizationId: string, delay: number) {
|
||||
return miscQueue.add(
|
||||
|
||||
Reference in New Issue
Block a user