fix: salt issues
This commit is contained in:
@@ -150,7 +150,7 @@ async function buildContext(
|
|||||||
? validatedBody.payload.properties.__deviceId
|
? validatedBody.payload.properties.__deviceId
|
||||||
: undefined;
|
: undefined;
|
||||||
|
|
||||||
const [salts] = await Promise.all([getSalts()]);
|
const salts = await getSalts();
|
||||||
currentDeviceId =
|
currentDeviceId =
|
||||||
overrideDeviceId ||
|
overrideDeviceId ||
|
||||||
(ua
|
(ua
|
||||||
|
|||||||
@@ -3,6 +3,30 @@ import { cronQueue } from '@openpanel/queue';
|
|||||||
|
|
||||||
import { logger } from './utils/logger';
|
import { logger } from './utils/logger';
|
||||||
|
|
||||||
|
async function removeConflictingJobs(schedulerKey: string) {
|
||||||
|
// Remove any existing jobs that might conflict with the scheduler
|
||||||
|
// BullMQ scheduler jobs have IDs like "repeat:<key>:<timestamp>"
|
||||||
|
const jobStates = ['delayed', 'waiting', 'completed', 'failed'] as const;
|
||||||
|
|
||||||
|
for (const state of jobStates) {
|
||||||
|
try {
|
||||||
|
const jobs = await cronQueue.getJobs([state]);
|
||||||
|
for (const job of jobs) {
|
||||||
|
// Check if this job was created by the scheduler we're about to upsert
|
||||||
|
if (job.id?.startsWith(`repeat:${schedulerKey}:`)) {
|
||||||
|
await job.remove();
|
||||||
|
logger.info('Removed conflicting scheduler job', {
|
||||||
|
jobId: job.id,
|
||||||
|
schedulerKey,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// Ignore errors during cleanup
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function bootCron() {
|
export async function bootCron() {
|
||||||
const jobs: {
|
const jobs: {
|
||||||
name: string;
|
name: string;
|
||||||
@@ -56,28 +80,98 @@ export async function bootCron() {
|
|||||||
|
|
||||||
logger.info('Updating cron jobs');
|
logger.info('Updating cron jobs');
|
||||||
|
|
||||||
const jobSchedulers = await cronQueue.getJobSchedulers();
|
const jobsToKeep = new Set(jobs.map((job) => job.type));
|
||||||
for (const jobScheduler of jobSchedulers) {
|
|
||||||
await cronQueue.removeJobScheduler(jobScheduler.key);
|
const currentJobSchedulers = await cronQueue
|
||||||
|
.getJobSchedulers()
|
||||||
|
.catch((error) => {
|
||||||
|
logger.error('Error getting job schedulers', {
|
||||||
|
error,
|
||||||
|
});
|
||||||
|
return [];
|
||||||
|
});
|
||||||
|
for (const jobScheduler of currentJobSchedulers) {
|
||||||
|
if (!jobsToKeep.has(jobScheduler.key as CronQueueType)) {
|
||||||
|
await cronQueue.removeJobScheduler(jobScheduler.key).catch((error) => {
|
||||||
|
logger.error('Error removing job scheduler', {
|
||||||
|
error,
|
||||||
|
jobScheduler: jobScheduler.key,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add repeatable jobs
|
|
||||||
for (const job of jobs) {
|
for (const job of jobs) {
|
||||||
await cronQueue.upsertJobScheduler(
|
try {
|
||||||
job.type,
|
await cronQueue.upsertJobScheduler(
|
||||||
typeof job.pattern === 'number'
|
job.type,
|
||||||
? {
|
typeof job.pattern === 'number'
|
||||||
every: job.pattern,
|
? {
|
||||||
}
|
every: job.pattern,
|
||||||
: {
|
}
|
||||||
pattern: job.pattern,
|
: {
|
||||||
|
pattern: job.pattern,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
data: {
|
||||||
|
type: job.type,
|
||||||
|
payload: undefined,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
data: {
|
|
||||||
type: job.type,
|
|
||||||
payload: undefined,
|
|
||||||
},
|
},
|
||||||
},
|
);
|
||||||
);
|
} catch (error) {
|
||||||
|
// If upsert fails due to conflicting job, try to clean up and retry
|
||||||
|
const isConflictError =
|
||||||
|
error instanceof Error &&
|
||||||
|
error.message.includes('job ID already exists');
|
||||||
|
|
||||||
|
if (isConflictError) {
|
||||||
|
logger.warn('Job scheduler conflict detected, attempting cleanup', {
|
||||||
|
job: job.type,
|
||||||
|
});
|
||||||
|
|
||||||
|
await removeConflictingJobs(job.type);
|
||||||
|
|
||||||
|
// Also try removing the scheduler itself to start fresh
|
||||||
|
try {
|
||||||
|
await cronQueue.removeJobScheduler(job.type);
|
||||||
|
} catch {
|
||||||
|
// Ignore - scheduler might not exist
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retry the upsert
|
||||||
|
try {
|
||||||
|
await cronQueue.upsertJobScheduler(
|
||||||
|
job.type,
|
||||||
|
typeof job.pattern === 'number'
|
||||||
|
? {
|
||||||
|
every: job.pattern,
|
||||||
|
}
|
||||||
|
: {
|
||||||
|
pattern: job.pattern,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
data: {
|
||||||
|
type: job.type,
|
||||||
|
payload: undefined,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
logger.info('Job scheduler created after cleanup', {
|
||||||
|
job: job.type,
|
||||||
|
});
|
||||||
|
} catch (retryError) {
|
||||||
|
logger.error('Error upserting job scheduler after cleanup', {
|
||||||
|
error: retryError,
|
||||||
|
job: job.type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error('Error upserting job scheduler', {
|
||||||
|
error,
|
||||||
|
job: job.type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,25 +1,59 @@
|
|||||||
import { generateSalt } from '@openpanel/common/server';
|
import { generateSalt } from '@openpanel/common/server';
|
||||||
import { db, getCurrentSalt } from '@openpanel/db';
|
import { db, getSalts } from '@openpanel/db';
|
||||||
import { getRedisCache } from '@openpanel/redis';
|
|
||||||
|
|
||||||
export async function salt() {
|
async function generateNewSalt() {
|
||||||
const oldSalt = await getCurrentSalt().catch(() => null);
|
const newSalt = await db.$transaction(async (tx) => {
|
||||||
const newSalt = await db.salt.create({
|
const existingSalts = await tx.salt.findMany({
|
||||||
data: {
|
orderBy: {
|
||||||
salt: generateSalt(),
|
createdAt: 'desc',
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
// Delete rest of the salts
|
|
||||||
await db.salt.deleteMany({
|
|
||||||
where: {
|
|
||||||
salt: {
|
|
||||||
notIn: oldSalt ? [newSalt.salt, oldSalt] : [newSalt.salt],
|
|
||||||
},
|
},
|
||||||
},
|
take: 2,
|
||||||
|
});
|
||||||
|
|
||||||
|
const created = await tx.salt.create({
|
||||||
|
data: {
|
||||||
|
salt: generateSalt(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Keep the new salt + the previous newest (if exists)
|
||||||
|
const previousNewest = existingSalts[0];
|
||||||
|
const saltsToKeep = previousNewest
|
||||||
|
? [created.salt, previousNewest.salt]
|
||||||
|
: [created.salt];
|
||||||
|
|
||||||
|
await tx.salt.deleteMany({
|
||||||
|
where: {
|
||||||
|
salt: {
|
||||||
|
notIn: saltsToKeep,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return created;
|
||||||
});
|
});
|
||||||
|
|
||||||
await getRedisCache().del('op:salt');
|
getSalts.clear();
|
||||||
|
|
||||||
return newSalt;
|
return newSalt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function salt() {
|
||||||
|
const ALLOWED_RETRIES = 5;
|
||||||
|
const BASE_DELAY = 1000;
|
||||||
|
const generateNewSaltWithRetry = async (retryCount = 0) => {
|
||||||
|
try {
|
||||||
|
return await generateNewSalt();
|
||||||
|
} catch (error) {
|
||||||
|
if (retryCount < ALLOWED_RETRIES) {
|
||||||
|
await new Promise((resolve) =>
|
||||||
|
setTimeout(resolve, BASE_DELAY * 2 ** retryCount),
|
||||||
|
);
|
||||||
|
return generateNewSaltWithRetry(retryCount + 1);
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return await generateNewSaltWithRetry();
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,20 +3,6 @@ import { generateSalt } from '@openpanel/common/server';
|
|||||||
import { cacheableLru } from '@openpanel/redis';
|
import { cacheableLru } from '@openpanel/redis';
|
||||||
import { db } from '../prisma-client';
|
import { db } from '../prisma-client';
|
||||||
|
|
||||||
export async function getCurrentSalt() {
|
|
||||||
const salt = await db.salt.findFirst({
|
|
||||||
orderBy: {
|
|
||||||
createdAt: 'desc',
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!salt) {
|
|
||||||
throw new Error('No salt found');
|
|
||||||
}
|
|
||||||
|
|
||||||
return salt.salt;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const getSalts = cacheableLru(
|
export const getSalts = cacheableLru(
|
||||||
'op:salt',
|
'op:salt',
|
||||||
async () => {
|
async () => {
|
||||||
@@ -31,13 +17,9 @@ export const getSalts = cacheableLru(
|
|||||||
throw new Error('No salt found');
|
throw new Error('No salt found');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!prev) {
|
|
||||||
throw new Error('No salt found');
|
|
||||||
}
|
|
||||||
|
|
||||||
const salts = {
|
const salts = {
|
||||||
current: curr.salt,
|
current: curr.salt,
|
||||||
previous: prev.salt,
|
previous: prev?.salt ?? curr.salt,
|
||||||
};
|
};
|
||||||
|
|
||||||
return salts;
|
return salts;
|
||||||
|
|||||||
Reference in New Issue
Block a user