wip
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import * as controller from '@/controllers/misc.controller';
|
||||
import { insightsQueue } from '@openpanel/queue';
|
||||
import type { FastifyPluginCallback } from 'fastify';
|
||||
|
||||
const miscRouter: FastifyPluginCallback = async (fastify) => {
|
||||
|
||||
@@ -14,13 +14,10 @@ import {
|
||||
import express from 'express';
|
||||
import client from 'prom-client';
|
||||
|
||||
import { getRedisQueue } from '@openpanel/redis';
|
||||
import { Worker } from 'bullmq';
|
||||
import { BullBoardGroupMQAdapter } from 'groupmq';
|
||||
import sourceMapSupport from 'source-map-support';
|
||||
import { bootCron } from './boot-cron';
|
||||
import { bootWorkers } from './boot-workers';
|
||||
import { insightsProjectJob } from './jobs/insights';
|
||||
import { register } from './metrics';
|
||||
import { logger } from './utils/logger';
|
||||
|
||||
|
||||
@@ -164,7 +164,11 @@ async function fetchPageTrendAggregates(ctx: ComputeContext): Promise<{
|
||||
export const pageTrendsModule: InsightModule = {
|
||||
key: 'page-trends',
|
||||
cadence: ['daily'],
|
||||
thresholds: { minTotal: 100, minAbsDelta: 30, minPct: 0.2, maxDims: 100 },
|
||||
// Share-based thresholds (values in basis points: 100 = 1%)
|
||||
// minTotal: require at least 0.5% combined share (current + baseline)
|
||||
// minAbsDelta: require at least 0.5 percentage point shift
|
||||
// minPct: require at least 25% relative change in share
|
||||
thresholds: { minTotal: 50, minAbsDelta: 50, minPct: 0.25, maxDims: 100 },
|
||||
|
||||
async enumerateDimensions(ctx) {
|
||||
const { currentMap, baselineMap } = await fetchPageTrendAggregates(ctx);
|
||||
@@ -185,28 +189,40 @@ export const pageTrendsModule: InsightModule = {
|
||||
if (!dimKey.startsWith('page:')) continue;
|
||||
const originPath = dimKey.replace('page:', '');
|
||||
|
||||
const currentValue = currentMap.get(originPath) ?? 0;
|
||||
const compareValue = baselineMap.get(originPath) ?? 0;
|
||||
const pageviewsCurrent = currentMap.get(originPath) ?? 0;
|
||||
const pageviewsCompare = baselineMap.get(originPath) ?? 0;
|
||||
|
||||
const currentShare = totalCurrent > 0 ? currentValue / totalCurrent : 0;
|
||||
const compareShare = totalBaseline > 0 ? compareValue / totalBaseline : 0;
|
||||
const currentShare =
|
||||
totalCurrent > 0 ? pageviewsCurrent / totalCurrent : 0;
|
||||
const compareShare =
|
||||
totalBaseline > 0 ? pageviewsCompare / totalBaseline : 0;
|
||||
|
||||
// Use share values in basis points (100 = 1%) for thresholding
|
||||
// This makes thresholds intuitive: minAbsDelta=50 means 0.5pp shift
|
||||
const currentShareBp = currentShare * 10000;
|
||||
const compareShareBp = compareShare * 10000;
|
||||
|
||||
const shareShiftPp = (currentShare - compareShare) * 100;
|
||||
const changePct = computeChangePct(currentValue, compareValue);
|
||||
const direction = computeDirection(changePct);
|
||||
// changePct is relative change in share, not absolute pageviews
|
||||
const shareChangePct = computeChangePct(currentShare, compareShare);
|
||||
const direction = computeDirection(shareChangePct);
|
||||
|
||||
results.push({
|
||||
ok: true,
|
||||
dimensionKey: dimKey,
|
||||
currentValue,
|
||||
compareValue,
|
||||
changePct,
|
||||
// Use share in basis points for threshold checks
|
||||
currentValue: currentShareBp,
|
||||
compareValue: compareShareBp,
|
||||
changePct: shareChangePct,
|
||||
direction,
|
||||
extra: {
|
||||
// Keep absolute values for display
|
||||
pageviewsCurrent,
|
||||
pageviewsCompare,
|
||||
shareShiftPp,
|
||||
currentShare,
|
||||
compareShare,
|
||||
isNew: compareValue === 0 && currentValue > 0,
|
||||
isNew: pageviewsCompare === 0 && pageviewsCurrent > 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -218,22 +234,26 @@ export const pageTrendsModule: InsightModule = {
|
||||
const originPath = result.dimensionKey.replace('page:', '');
|
||||
const [origin, path] = originPath.split(DELIMITER);
|
||||
const displayValue = origin ? `${origin}${path}` : path || '/';
|
||||
const pct = ((result.changePct ?? 0) * 100).toFixed(1);
|
||||
const isIncrease = (result.changePct ?? 0) >= 0;
|
||||
|
||||
// Get absolute pageviews from extra (currentValue/compareValue are now share-based)
|
||||
const pageviewsCurrent = Number(result.extra?.pageviewsCurrent ?? 0);
|
||||
const pageviewsCompare = Number(result.extra?.pageviewsCompare ?? 0);
|
||||
const shareCurrent = Number(result.extra?.currentShare ?? 0);
|
||||
const shareCompare = Number(result.extra?.compareShare ?? 0);
|
||||
const shareShiftPp = Number(result.extra?.shareShiftPp ?? 0);
|
||||
const isNew = result.extra?.isNew as boolean | undefined;
|
||||
|
||||
// Display share shift in percentage points
|
||||
const isIncrease = shareShiftPp >= 0;
|
||||
const shareShiftDisplay = Math.abs(shareShiftPp).toFixed(1);
|
||||
|
||||
const title = isNew
|
||||
? `New page getting views: ${displayValue}`
|
||||
: `Page ${displayValue} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`;
|
||||
|
||||
const pageviewsCurrent = result.currentValue ?? 0;
|
||||
const pageviewsCompare = result.compareValue ?? 0;
|
||||
const shareCurrent = Number(result.extra?.currentShare ?? 0);
|
||||
const shareCompare = Number(result.extra?.compareShare ?? 0);
|
||||
: `Page ${displayValue} share ${isIncrease ? '↑' : '↓'} ${shareShiftDisplay}pp`;
|
||||
|
||||
return {
|
||||
title,
|
||||
summary: `${ctx.window.label}. Pageviews ${pageviewsCurrent} vs ${pageviewsCompare}.`,
|
||||
summary: `${ctx.window.label}. Share ${(shareCurrent * 100).toFixed(1)}% vs ${(shareCompare * 100).toFixed(1)}%.`,
|
||||
displayName: displayValue,
|
||||
payload: {
|
||||
kind: 'insight_v1',
|
||||
@@ -241,35 +261,36 @@ export const pageTrendsModule: InsightModule = {
|
||||
{ key: 'origin', value: origin ?? '', displayName: origin ?? '' },
|
||||
{ key: 'path', value: path ?? '', displayName: path ?? '' },
|
||||
],
|
||||
primaryMetric: 'pageviews',
|
||||
primaryMetric: 'share',
|
||||
metrics: {
|
||||
pageviews: {
|
||||
current: pageviewsCurrent,
|
||||
compare: pageviewsCompare,
|
||||
delta: pageviewsCurrent - pageviewsCompare,
|
||||
changePct: pageviewsCompare > 0 ? (result.changePct ?? 0) : null,
|
||||
direction: result.direction ?? 'flat',
|
||||
changePct:
|
||||
pageviewsCompare > 0
|
||||
? (pageviewsCurrent - pageviewsCompare) / pageviewsCompare
|
||||
: null,
|
||||
direction:
|
||||
pageviewsCurrent > pageviewsCompare
|
||||
? 'up'
|
||||
: pageviewsCurrent < pageviewsCompare
|
||||
? 'down'
|
||||
: 'flat',
|
||||
unit: 'count',
|
||||
},
|
||||
share: {
|
||||
current: shareCurrent,
|
||||
compare: shareCompare,
|
||||
delta: shareCurrent - shareCompare,
|
||||
changePct:
|
||||
shareCompare > 0
|
||||
? (shareCurrent - shareCompare) / shareCompare
|
||||
: null,
|
||||
direction:
|
||||
shareCurrent - shareCompare > 0.0005
|
||||
? 'up'
|
||||
: shareCurrent - shareCompare < -0.0005
|
||||
? 'down'
|
||||
: 'flat',
|
||||
changePct: result.changePct ?? null, // This is now share-based
|
||||
direction: result.direction ?? 'flat',
|
||||
unit: 'ratio',
|
||||
},
|
||||
},
|
||||
extra: {
|
||||
isNew: result.extra?.isNew,
|
||||
shareShiftPp,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -247,13 +247,76 @@ export const insightStore: InsightStore = {
|
||||
return { suppressed: 0, unsuppressed: 0 };
|
||||
}
|
||||
|
||||
let suppressed = 0;
|
||||
let unsuppressed = 0;
|
||||
|
||||
// For "yesterday" insights, suppress any that are stale (windowEnd is not actually yesterday)
|
||||
// This prevents showing confusing insights like "Yesterday traffic dropped" when it's from 2+ days ago
|
||||
if (windowKind === 'yesterday') {
|
||||
const yesterday = new Date(now);
|
||||
yesterday.setUTCHours(0, 0, 0, 0);
|
||||
yesterday.setUTCDate(yesterday.getUTCDate() - 1);
|
||||
const yesterdayTime = yesterday.getTime();
|
||||
|
||||
for (const insight of insights) {
|
||||
// If windowEnd is null, consider it stale
|
||||
const isStale = insight.windowEnd
|
||||
? new Date(insight.windowEnd).setUTCHours(0, 0, 0, 0) !==
|
||||
yesterdayTime
|
||||
: true;
|
||||
|
||||
if (isStale && insight.state === 'active') {
|
||||
await db.projectInsight.update({
|
||||
where: { id: insight.id },
|
||||
data: { state: 'suppressed', lastUpdatedAt: now },
|
||||
});
|
||||
suppressed++;
|
||||
}
|
||||
}
|
||||
|
||||
// Filter to only non-stale insights for top-N logic
|
||||
const freshInsights = insights.filter((insight) => {
|
||||
if (!insight.windowEnd) return false;
|
||||
const windowEndTime = new Date(insight.windowEnd).setUTCHours(
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
);
|
||||
return windowEndTime === yesterdayTime;
|
||||
});
|
||||
|
||||
const topN = freshInsights.slice(0, keepTopN);
|
||||
const belowN = freshInsights.slice(keepTopN);
|
||||
|
||||
for (const insight of belowN) {
|
||||
if (insight.state === 'active') {
|
||||
await db.projectInsight.update({
|
||||
where: { id: insight.id },
|
||||
data: { state: 'suppressed', lastUpdatedAt: now },
|
||||
});
|
||||
suppressed++;
|
||||
}
|
||||
}
|
||||
|
||||
for (const insight of topN) {
|
||||
if (insight.state === 'suppressed') {
|
||||
await db.projectInsight.update({
|
||||
where: { id: insight.id },
|
||||
data: { state: 'active', lastUpdatedAt: now },
|
||||
});
|
||||
unsuppressed++;
|
||||
}
|
||||
}
|
||||
|
||||
return { suppressed, unsuppressed };
|
||||
}
|
||||
|
||||
// For non-yesterday windows, apply standard top-N suppression
|
||||
const topN = insights.slice(0, keepTopN);
|
||||
const belowN = insights.slice(keepTopN);
|
||||
|
||||
// Suppress those below top N
|
||||
let suppressed = 0;
|
||||
let unsuppressed = 0;
|
||||
|
||||
for (const insight of belowN) {
|
||||
if (insight.state === 'active') {
|
||||
await db.projectInsight.update({
|
||||
|
||||
Reference in New Issue
Block a user