fix: comments
This commit is contained in:
@@ -45,7 +45,7 @@ export async function bootCron() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const lock = await getLock('cron:lock', '1', 1000 * 60 * 60 * 5);
|
const lock = await getLock('cron:lock', '1', 1000 * 60 * 5);
|
||||||
|
|
||||||
if (lock) {
|
if (lock) {
|
||||||
logger.info('Cron lock acquired');
|
logger.info('Cron lock acquired');
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import { notificationJob } from './jobs/notification';
|
|||||||
import { sessionsJob } from './jobs/sessions';
|
import { sessionsJob } from './jobs/sessions';
|
||||||
import { eventsGroupJobDuration } from './metrics';
|
import { eventsGroupJobDuration } from './metrics';
|
||||||
import { logger } from './utils/logger';
|
import { logger } from './utils/logger';
|
||||||
import { requireSingleton } from './utils/singleton-lock';
|
|
||||||
|
|
||||||
const workerOptions: WorkerOptions = {
|
const workerOptions: WorkerOptions = {
|
||||||
connection: getRedisQueue(),
|
connection: getRedisQueue(),
|
||||||
@@ -85,15 +84,6 @@ function getConcurrencyFor(queueName: string, defaultValue = 1): number {
|
|||||||
|
|
||||||
export async function bootWorkers() {
|
export async function bootWorkers() {
|
||||||
const enabledQueues = getEnabledQueues();
|
const enabledQueues = getEnabledQueues();
|
||||||
const enforceSingleton = process.env.ENFORCE_SINGLETON === '1';
|
|
||||||
let singletonCleanup: (() => void) | null = null;
|
|
||||||
|
|
||||||
// Enforce singleton lock if requested
|
|
||||||
if (enforceSingleton) {
|
|
||||||
const lockKey = enabledQueues.join(',');
|
|
||||||
logger.info('Enforcing singleton mode', { lockKey });
|
|
||||||
singletonCleanup = await requireSingleton(lockKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
const workers: (Worker | GroupWorker<any>)[] = [];
|
const workers: (Worker | GroupWorker<any>)[] = [];
|
||||||
|
|
||||||
@@ -132,22 +122,7 @@ export async function bootWorkers() {
|
|||||||
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
|
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
|
||||||
),
|
),
|
||||||
handler: async (job) => {
|
handler: async (job) => {
|
||||||
if (await getLock(job.id, '1', 10000)) {
|
return await incomingEventPure(job.data);
|
||||||
logger.info('worker handler', {
|
|
||||||
jobId: job.id,
|
|
||||||
groupId: job.groupId,
|
|
||||||
timestamp: job.data.event.timestamp,
|
|
||||||
data: job.data,
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.info('event already processed', {
|
|
||||||
jobId: job.id,
|
|
||||||
groupId: job.groupId,
|
|
||||||
timestamp: job.data.event.timestamp,
|
|
||||||
data: job.data,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
await incomingEventPure(job.data);
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -294,11 +269,6 @@ export async function bootWorkers() {
|
|||||||
|
|
||||||
await Promise.all(workers.map((worker) => worker.close()));
|
await Promise.all(workers.map((worker) => worker.close()));
|
||||||
|
|
||||||
// Release singleton lock if acquired
|
|
||||||
if (singletonCleanup) {
|
|
||||||
singletonCleanup();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('workers closed successfully', {
|
logger.info('workers closed successfully', {
|
||||||
elapsed: performance.now() - time,
|
elapsed: performance.now() - time,
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { ExpressAdapter } from '@bull-board/express';
|
|||||||
import { createInitialSalts } from '@openpanel/db';
|
import { createInitialSalts } from '@openpanel/db';
|
||||||
import {
|
import {
|
||||||
cronQueue,
|
cronQueue,
|
||||||
eventsGroupQueue,
|
eventsGroupQueues,
|
||||||
importQueue,
|
importQueue,
|
||||||
miscQueue,
|
miscQueue,
|
||||||
notificationQueue,
|
notificationQueue,
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ export const eventsGroupJobDuration = new client.Histogram({
|
|||||||
help: 'Duration of job processing in eventsGroupQueues (in ms)',
|
help: 'Duration of job processing in eventsGroupQueues (in ms)',
|
||||||
labelNames: ['queue_shard', 'status'],
|
labelNames: ['queue_shard', 'status'],
|
||||||
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
|
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
|
||||||
registers: [register],
|
|
||||||
});
|
});
|
||||||
|
|
||||||
register.registerMetric(eventsGroupJobDuration);
|
register.registerMetric(eventsGroupJobDuration);
|
||||||
|
|||||||
@@ -1,69 +0,0 @@
|
|||||||
import { getLock, getRedisCache } from '@openpanel/redis';
|
|
||||||
import { logger } from './logger';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Acquires a distributed lock to ensure only one instance of a worker group runs.
|
|
||||||
* If the lock cannot be acquired, the process exits.
|
|
||||||
*
|
|
||||||
* @param key - The lock key (e.g., 'utility-queues')
|
|
||||||
* @param ttlMs - Time to live for the lock in milliseconds (default: 60000)
|
|
||||||
* @returns A cleanup function that releases the lock
|
|
||||||
*/
|
|
||||||
export async function requireSingleton(
|
|
||||||
key: string,
|
|
||||||
ttlMs = 60_000,
|
|
||||||
): Promise<() => void> {
|
|
||||||
const lockKey = `lock:singleton:${key}`;
|
|
||||||
const lockValue = `${process.pid}-${Date.now()}`;
|
|
||||||
|
|
||||||
// Try to acquire the lock
|
|
||||||
const acquired = await getLock(lockKey, lockValue, ttlMs);
|
|
||||||
|
|
||||||
if (!acquired) {
|
|
||||||
logger.error(
|
|
||||||
`Another instance holds singleton lock for "${key}". Exiting.`,
|
|
||||||
{ key },
|
|
||||||
);
|
|
||||||
process.exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug('Acquired singleton lock', { key, ttlMs, lockValue });
|
|
||||||
|
|
||||||
// Set up automatic extension to keep the lock alive
|
|
||||||
const extensionInterval = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
// Extend the lock by setting it again with the same value
|
|
||||||
const redis = getRedisCache();
|
|
||||||
const result = await redis.set(lockKey, lockValue, 'PX', ttlMs, 'XX');
|
|
||||||
|
|
||||||
if (result === 'OK') {
|
|
||||||
logger.debug('Extended singleton lock', { key });
|
|
||||||
} else {
|
|
||||||
// Lock was lost (someone else acquired it or it expired)
|
|
||||||
logger.error('Lost singleton lock - exiting', { key });
|
|
||||||
clearInterval(extensionInterval);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
} catch (error: unknown) {
|
|
||||||
logger.error('Failed to extend singleton lock - exiting', {
|
|
||||||
key,
|
|
||||||
error,
|
|
||||||
});
|
|
||||||
clearInterval(extensionInterval);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
}, ttlMs / 2);
|
|
||||||
|
|
||||||
// Return cleanup function
|
|
||||||
return () => {
|
|
||||||
clearInterval(extensionInterval);
|
|
||||||
getRedisCache()
|
|
||||||
.del(lockKey)
|
|
||||||
.then(() => {
|
|
||||||
logger.debug('Released singleton lock', { key });
|
|
||||||
})
|
|
||||||
.catch((error: unknown) => {
|
|
||||||
logger.error('Failed to release singleton lock', { key, error });
|
|
||||||
});
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -49,7 +49,7 @@ export class ProfileBuffer extends BaseBuffer {
|
|||||||
profileId: profile.id,
|
profileId: profile.id,
|
||||||
projectId: profile.project_id,
|
projectId: profile.project_id,
|
||||||
});
|
});
|
||||||
return (await getRedisCache().exists(cacheKey)) === 1;
|
return (await this.redis.exists(cacheKey)) === 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
async add(profile: IClickhouseProfile, isFromEvent = false) {
|
async add(profile: IClickhouseProfile, isFromEvent = false) {
|
||||||
@@ -153,7 +153,7 @@ export class ProfileBuffer extends BaseBuffer {
|
|||||||
profileId,
|
profileId,
|
||||||
projectId,
|
projectId,
|
||||||
});
|
});
|
||||||
const existingProfile = await getRedisCache().get(cacheKey);
|
const existingProfile = await this.redis.get(cacheKey);
|
||||||
if (!existingProfile) {
|
if (!existingProfile) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -212,7 +212,7 @@ export class SessionBuffer extends BaseBuffer {
|
|||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const chunk of this.chunks(sessions, 1000)) {
|
for (const chunk of this.chunks(sessions, this.chunkSize)) {
|
||||||
// Insert to ClickHouse
|
// Insert to ClickHouse
|
||||||
await ch.insert({
|
await ch.insert({
|
||||||
table: TABLE_NAMES.sessions,
|
table: TABLE_NAMES.sessions,
|
||||||
|
|||||||
762
pnpm-lock.yaml
generated
762
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user