This commit is contained in:
Carl-Gerhard Lindesvärd
2025-12-14 11:01:26 +01:00
parent dad9baa581
commit bc84404235
35 changed files with 3172 additions and 13 deletions

View File

@@ -44,6 +44,12 @@ export async function bootCron() {
});
}
jobs.push({
name: 'insightsDaily',
type: 'insightsDaily',
pattern: '0 2 * * *', // 2 AM daily
});
logger.info('Updating cron jobs');
const jobSchedulers = await cronQueue.getJobSchedulers();

View File

@@ -7,6 +7,7 @@ import {
cronQueue,
eventsGroupQueues,
importQueue,
insightsQueue,
miscQueue,
notificationQueue,
queueLogger,
@@ -21,6 +22,7 @@ import { Worker as GroupWorker } from 'groupmq';
import { cronJob } from './jobs/cron';
import { incomingEvent } from './jobs/events.incoming-event';
import { importJob } from './jobs/import';
import { insightsProjectJob } from './jobs/insights';
import { miscJob } from './jobs/misc';
import { notificationJob } from './jobs/notification';
import { sessionsJob } from './jobs/sessions';
@@ -49,7 +51,15 @@ function getEnabledQueues(): QueueName[] {
logger.info('No ENABLED_QUEUES specified, starting all queues', {
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
});
return ['events', 'sessions', 'cron', 'notification', 'misc', 'import'];
return [
'events',
'sessions',
'cron',
'notification',
'misc',
'import',
'insights',
];
}
const queues = enabledQueuesEnv
@@ -187,6 +197,17 @@ export async function bootWorkers() {
logger.info('Started worker for import', { concurrency });
}
// Start insights worker
if (enabledQueues.includes('insights')) {
const concurrency = getConcurrencyFor('insights', 5);
const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, {
...workerOptions,
concurrency,
});
workers.push(insightsWorker);
logger.info('Started worker for insights', { concurrency });
}
if (workers.length === 0) {
logger.warn(
'No workers started. Check ENABLED_QUEUES environment variable.',

View File

@@ -6,6 +6,7 @@ import {
cronQueue,
eventsGroupQueues,
importQueue,
insightsQueue,
miscQueue,
notificationQueue,
sessionsQueue,
@@ -13,10 +14,13 @@ import {
import express from 'express';
import client from 'prom-client';
import { getRedisQueue } from '@openpanel/redis';
import { Worker } from 'bullmq';
import { BullBoardGroupMQAdapter } from 'groupmq';
import sourceMapSupport from 'source-map-support';
import { bootCron } from './boot-cron';
import { bootWorkers } from './boot-workers';
import { insightsProjectJob } from './jobs/insights';
import { register } from './metrics';
import { logger } from './utils/logger';
@@ -42,6 +46,7 @@ async function start() {
new BullMQAdapter(notificationQueue),
new BullMQAdapter(miscQueue),
new BullMQAdapter(importQueue),
new BullMQAdapter(insightsQueue),
],
serverAdapter: serverAdapter,
});
@@ -74,6 +79,11 @@ async function start() {
await bootCron();
} else {
logger.warn('Workers are disabled');
// Start insights worker
const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, {
connection: getRedisQueue(),
});
}
await createInitialSalts();

View File

@@ -6,6 +6,7 @@ import type { CronQueuePayload } from '@openpanel/queue';
import { jobdeleteProjects } from './cron.delete-projects';
import { ping } from './cron.ping';
import { salt } from './cron.salt';
import { insightsDailyJob } from './insights';
export async function cronJob(job: Job<CronQueuePayload>) {
switch (job.data.type) {
@@ -27,5 +28,8 @@ export async function cronJob(job: Job<CronQueuePayload>) {
case 'deleteProjects': {
return await jobdeleteProjects(job);
}
case 'insightsDaily': {
return await insightsDailyJob(job);
}
}
}

View File

@@ -0,0 +1,71 @@
import { ch } from '@openpanel/db/src/clickhouse/client';
import {
createEngine,
devicesModule,
entryPagesModule,
geoModule,
insightStore,
pageTrendsModule,
referrersModule,
} from '@openpanel/db/src/services/insights';
import type {
CronQueuePayload,
InsightsQueuePayloadProject,
} from '@openpanel/queue';
import { insightsQueue } from '@openpanel/queue';
import type { Job } from 'bullmq';
const defaultEngineConfig = {
keepTopNPerModuleWindow: 5,
closeStaleAfterDays: 7,
dimensionBatchSize: 50,
globalThresholds: {
minTotal: 200,
minAbsDelta: 80,
minPct: 0.15,
},
enableExplain: false,
explainTopNPerProjectPerDay: 3,
};
export async function insightsDailyJob(job: Job<CronQueuePayload>) {
const projectIds = await insightStore.listProjectIdsForCadence('daily');
const date = new Date().toISOString().slice(0, 10);
for (const projectId of projectIds) {
await insightsQueue.add(
'insightsProject',
{
type: 'insightsProject',
payload: { projectId, date },
},
{
jobId: `daily:${date}:${projectId}`, // idempotent
},
);
}
}
export async function insightsProjectJob(
job: Job<InsightsQueuePayloadProject>,
) {
const { projectId, date } = job.data.payload;
const engine = createEngine({
store: insightStore,
modules: [
referrersModule,
entryPagesModule,
pageTrendsModule,
geoModule,
devicesModule,
],
db: ch,
config: defaultEngineConfig,
});
await engine.runProject({
projectId,
cadence: 'daily',
now: new Date(date),
});
}