From db6291982594d0877cb4bf0d1a1b9d9f186205d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 26 Jan 2026 12:07:06 +0100 Subject: [PATCH] fix: salt issues --- apps/api/src/controllers/track.controller.ts | 2 +- apps/worker/src/boot-cron.ts | 130 ++++++++++++++++--- apps/worker/src/jobs/cron.salt.ts | 68 +++++++--- packages/db/src/services/salt.service.ts | 20 +-- 4 files changed, 165 insertions(+), 55 deletions(-) diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 74a2da76..61887395 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -150,7 +150,7 @@ async function buildContext( ? validatedBody.payload.properties.__deviceId : undefined; - const [salts] = await Promise.all([getSalts()]); + const salts = await getSalts(); currentDeviceId = overrideDeviceId || (ua diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 9eb558c6..177bbdd4 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -3,6 +3,30 @@ import { cronQueue } from '@openpanel/queue'; import { logger } from './utils/logger'; +async function removeConflictingJobs(schedulerKey: string) { + // Remove any existing jobs that might conflict with the scheduler + // BullMQ scheduler jobs have IDs like "repeat::" + const jobStates = ['delayed', 'waiting', 'completed', 'failed'] as const; + + for (const state of jobStates) { + try { + const jobs = await cronQueue.getJobs([state]); + for (const job of jobs) { + // Check if this job was created by the scheduler we're about to upsert + if (job.id?.startsWith(`repeat:${schedulerKey}:`)) { + await job.remove(); + logger.info('Removed conflicting scheduler job', { + jobId: job.id, + schedulerKey, + }); + } + } + } catch (error) { + // Ignore errors during cleanup + } + } +} + export async function bootCron() { const jobs: { name: string; @@ -56,28 +80,98 @@ export async function bootCron() { logger.info('Updating cron jobs'); - const jobSchedulers = await cronQueue.getJobSchedulers(); - for (const jobScheduler of jobSchedulers) { - await cronQueue.removeJobScheduler(jobScheduler.key); + const jobsToKeep = new Set(jobs.map((job) => job.type)); + + const currentJobSchedulers = await cronQueue + .getJobSchedulers() + .catch((error) => { + logger.error('Error getting job schedulers', { + error, + }); + return []; + }); + for (const jobScheduler of currentJobSchedulers) { + if (!jobsToKeep.has(jobScheduler.key as CronQueueType)) { + await cronQueue.removeJobScheduler(jobScheduler.key).catch((error) => { + logger.error('Error removing job scheduler', { + error, + jobScheduler: jobScheduler.key, + }); + }); + } } - // Add repeatable jobs for (const job of jobs) { - await cronQueue.upsertJobScheduler( - job.type, - typeof job.pattern === 'number' - ? { - every: job.pattern, - } - : { - pattern: job.pattern, + try { + await cronQueue.upsertJobScheduler( + job.type, + typeof job.pattern === 'number' + ? { + every: job.pattern, + } + : { + pattern: job.pattern, + }, + { + data: { + type: job.type, + payload: undefined, }, - { - data: { - type: job.type, - payload: undefined, }, - }, - ); + ); + } catch (error) { + // If upsert fails due to conflicting job, try to clean up and retry + const isConflictError = + error instanceof Error && + error.message.includes('job ID already exists'); + + if (isConflictError) { + logger.warn('Job scheduler conflict detected, attempting cleanup', { + job: job.type, + }); + + await removeConflictingJobs(job.type); + + // Also try removing the scheduler itself to start fresh + try { + await cronQueue.removeJobScheduler(job.type); + } catch { + // Ignore - scheduler might not exist + } + + // Retry the upsert + try { + await cronQueue.upsertJobScheduler( + job.type, + typeof job.pattern === 'number' + ? { + every: job.pattern, + } + : { + pattern: job.pattern, + }, + { + data: { + type: job.type, + payload: undefined, + }, + }, + ); + logger.info('Job scheduler created after cleanup', { + job: job.type, + }); + } catch (retryError) { + logger.error('Error upserting job scheduler after cleanup', { + error: retryError, + job: job.type, + }); + } + } else { + logger.error('Error upserting job scheduler', { + error, + job: job.type, + }); + } + } } } diff --git a/apps/worker/src/jobs/cron.salt.ts b/apps/worker/src/jobs/cron.salt.ts index f4159842..e7575f9f 100644 --- a/apps/worker/src/jobs/cron.salt.ts +++ b/apps/worker/src/jobs/cron.salt.ts @@ -1,25 +1,59 @@ import { generateSalt } from '@openpanel/common/server'; -import { db, getCurrentSalt } from '@openpanel/db'; -import { getRedisCache } from '@openpanel/redis'; +import { db, getSalts } from '@openpanel/db'; -export async function salt() { - const oldSalt = await getCurrentSalt().catch(() => null); - const newSalt = await db.salt.create({ - data: { - salt: generateSalt(), - }, - }); - - // Delete rest of the salts - await db.salt.deleteMany({ - where: { - salt: { - notIn: oldSalt ? [newSalt.salt, oldSalt] : [newSalt.salt], +async function generateNewSalt() { + const newSalt = await db.$transaction(async (tx) => { + const existingSalts = await tx.salt.findMany({ + orderBy: { + createdAt: 'desc', }, - }, + take: 2, + }); + + const created = await tx.salt.create({ + data: { + salt: generateSalt(), + }, + }); + + // Keep the new salt + the previous newest (if exists) + const previousNewest = existingSalts[0]; + const saltsToKeep = previousNewest + ? [created.salt, previousNewest.salt] + : [created.salt]; + + await tx.salt.deleteMany({ + where: { + salt: { + notIn: saltsToKeep, + }, + }, + }); + + return created; }); - await getRedisCache().del('op:salt'); + getSalts.clear(); return newSalt; } + +export async function salt() { + const ALLOWED_RETRIES = 5; + const BASE_DELAY = 1000; + const generateNewSaltWithRetry = async (retryCount = 0) => { + try { + return await generateNewSalt(); + } catch (error) { + if (retryCount < ALLOWED_RETRIES) { + await new Promise((resolve) => + setTimeout(resolve, BASE_DELAY * 2 ** retryCount), + ); + return generateNewSaltWithRetry(retryCount + 1); + } + throw error; + } + }; + + return await generateNewSaltWithRetry(); +} diff --git a/packages/db/src/services/salt.service.ts b/packages/db/src/services/salt.service.ts index e9da7083..ce3b9f59 100644 --- a/packages/db/src/services/salt.service.ts +++ b/packages/db/src/services/salt.service.ts @@ -3,20 +3,6 @@ import { generateSalt } from '@openpanel/common/server'; import { cacheableLru } from '@openpanel/redis'; import { db } from '../prisma-client'; -export async function getCurrentSalt() { - const salt = await db.salt.findFirst({ - orderBy: { - createdAt: 'desc', - }, - }); - - if (!salt) { - throw new Error('No salt found'); - } - - return salt.salt; -} - export const getSalts = cacheableLru( 'op:salt', async () => { @@ -31,13 +17,9 @@ export const getSalts = cacheableLru( throw new Error('No salt found'); } - if (!prev) { - throw new Error('No salt found'); - } - const salts = { current: curr.salt, - previous: prev.salt, + previous: prev?.salt ?? curr.salt, }; return salts;