feat: insights
* fix: migration for newly created self-hosting instances * fix: build script * wip * wip * wip * fix: tailwind css
This commit is contained in:
committed by
GitHub
parent
1e4f02fb5e
commit
5f38560373
@@ -34,6 +34,11 @@ export async function bootCron() {
|
||||
type: 'flushSessions',
|
||||
pattern: 1000 * 10,
|
||||
},
|
||||
{
|
||||
name: 'insightsDaily',
|
||||
type: 'insightsDaily',
|
||||
pattern: '0 2 * * *',
|
||||
},
|
||||
];
|
||||
|
||||
if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') {
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
cronQueue,
|
||||
eventsGroupQueues,
|
||||
importQueue,
|
||||
insightsQueue,
|
||||
miscQueue,
|
||||
notificationQueue,
|
||||
queueLogger,
|
||||
@@ -21,6 +22,7 @@ import { Worker as GroupWorker } from 'groupmq';
|
||||
import { cronJob } from './jobs/cron';
|
||||
import { incomingEvent } from './jobs/events.incoming-event';
|
||||
import { importJob } from './jobs/import';
|
||||
import { insightsProjectJob } from './jobs/insights';
|
||||
import { miscJob } from './jobs/misc';
|
||||
import { notificationJob } from './jobs/notification';
|
||||
import { sessionsJob } from './jobs/sessions';
|
||||
@@ -49,7 +51,15 @@ function getEnabledQueues(): QueueName[] {
|
||||
logger.info('No ENABLED_QUEUES specified, starting all queues', {
|
||||
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
|
||||
});
|
||||
return ['events', 'sessions', 'cron', 'notification', 'misc', 'import'];
|
||||
return [
|
||||
'events',
|
||||
'sessions',
|
||||
'cron',
|
||||
'notification',
|
||||
'misc',
|
||||
'import',
|
||||
'insights',
|
||||
];
|
||||
}
|
||||
|
||||
const queues = enabledQueuesEnv
|
||||
@@ -187,6 +197,17 @@ export async function bootWorkers() {
|
||||
logger.info('Started worker for import', { concurrency });
|
||||
}
|
||||
|
||||
// Start insights worker
|
||||
if (enabledQueues.includes('insights')) {
|
||||
const concurrency = getConcurrencyFor('insights', 5);
|
||||
const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, {
|
||||
...workerOptions,
|
||||
concurrency,
|
||||
});
|
||||
workers.push(insightsWorker);
|
||||
logger.info('Started worker for insights', { concurrency });
|
||||
}
|
||||
|
||||
if (workers.length === 0) {
|
||||
logger.warn(
|
||||
'No workers started. Check ENABLED_QUEUES environment variable.',
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
cronQueue,
|
||||
eventsGroupQueues,
|
||||
importQueue,
|
||||
insightsQueue,
|
||||
miscQueue,
|
||||
notificationQueue,
|
||||
sessionsQueue,
|
||||
@@ -42,6 +43,7 @@ async function start() {
|
||||
new BullMQAdapter(notificationQueue),
|
||||
new BullMQAdapter(miscQueue),
|
||||
new BullMQAdapter(importQueue),
|
||||
new BullMQAdapter(insightsQueue),
|
||||
],
|
||||
serverAdapter: serverAdapter,
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@ import type { CronQueuePayload } from '@openpanel/queue';
|
||||
import { jobdeleteProjects } from './cron.delete-projects';
|
||||
import { ping } from './cron.ping';
|
||||
import { salt } from './cron.salt';
|
||||
import { insightsDailyJob } from './insights';
|
||||
|
||||
export async function cronJob(job: Job<CronQueuePayload>) {
|
||||
switch (job.data.type) {
|
||||
@@ -27,5 +28,8 @@ export async function cronJob(job: Job<CronQueuePayload>) {
|
||||
case 'deleteProjects': {
|
||||
return await jobdeleteProjects(job);
|
||||
}
|
||||
case 'insightsDaily': {
|
||||
return await insightsDailyJob(job);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
72
apps/worker/src/jobs/insights.ts
Normal file
72
apps/worker/src/jobs/insights.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { ch } from '@openpanel/db/src/clickhouse/client';
|
||||
import {
|
||||
createEngine,
|
||||
devicesModule,
|
||||
entryPagesModule,
|
||||
geoModule,
|
||||
insightStore,
|
||||
pageTrendsModule,
|
||||
referrersModule,
|
||||
} from '@openpanel/db/src/services/insights';
|
||||
import type {
|
||||
CronQueuePayload,
|
||||
InsightsQueuePayloadProject,
|
||||
} from '@openpanel/queue';
|
||||
import { insightsQueue } from '@openpanel/queue';
|
||||
import type { Job } from 'bullmq';
|
||||
|
||||
const defaultEngineConfig = {
|
||||
keepTopNPerModuleWindow: 20,
|
||||
closeStaleAfterDays: 7,
|
||||
dimensionBatchSize: 50,
|
||||
globalThresholds: {
|
||||
minTotal: 200,
|
||||
minAbsDelta: 80,
|
||||
minPct: 0.15,
|
||||
},
|
||||
};
|
||||
|
||||
export async function insightsDailyJob(job: Job<CronQueuePayload>) {
|
||||
const projectIds = await insightStore.listProjectIdsForCadence('daily');
|
||||
const date = new Date().toISOString().slice(0, 10);
|
||||
|
||||
for (const projectId of projectIds) {
|
||||
await insightsQueue.add(
|
||||
'insightsProject',
|
||||
{
|
||||
type: 'insightsProject',
|
||||
payload: { projectId, date },
|
||||
},
|
||||
{
|
||||
jobId: `daily:${date}:${projectId}`, // idempotent
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export async function insightsProjectJob(
|
||||
job: Job<InsightsQueuePayloadProject>,
|
||||
) {
|
||||
const { projectId, date } = job.data.payload;
|
||||
const engine = createEngine({
|
||||
store: insightStore,
|
||||
modules: [
|
||||
referrersModule,
|
||||
entryPagesModule,
|
||||
pageTrendsModule,
|
||||
geoModule,
|
||||
devicesModule,
|
||||
],
|
||||
db: ch,
|
||||
config: defaultEngineConfig,
|
||||
});
|
||||
|
||||
const projectCreatedAt = await insightStore.getProjectCreatedAt(projectId);
|
||||
|
||||
await engine.runProject({
|
||||
projectId,
|
||||
cadence: 'daily',
|
||||
now: new Date(date),
|
||||
projectCreatedAt,
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user