diff --git a/apps/api/src/routes/misc.router.ts b/apps/api/src/routes/misc.router.ts index db5ffba6..dabe1322 100644 --- a/apps/api/src/routes/misc.router.ts +++ b/apps/api/src/routes/misc.router.ts @@ -1,4 +1,5 @@ import * as controller from '@/controllers/misc.controller'; +import { insightsQueue } from '@openpanel/queue'; import type { FastifyPluginCallback } from 'fastify'; const miscRouter: FastifyPluginCallback = async (fastify) => { @@ -43,6 +44,27 @@ const miscRouter: FastifyPluginCallback = async (fastify) => { url: '/geo', handler: controller.getGeo, }); + + fastify.route({ + method: 'GET', + url: '/insights/test', + handler: async (req, reply) => { + const projectId = req.query.projectId as string; + const job = await insightsQueue.add( + 'insightsProject', + { + type: 'insightsProject', + payload: { + projectId: projectId, + date: new Date().toISOString().slice(0, 10), + }, + }, + { jobId: `manual:${Date.now()}:${projectId}` }, + ); + + return { jobId: job.id }; + }, + }); }; export default miscRouter; diff --git a/apps/start/src/components/insights/insight-card.tsx b/apps/start/src/components/insights/insight-card.tsx new file mode 100644 index 00000000..80d5e7f5 --- /dev/null +++ b/apps/start/src/components/insights/insight-card.tsx @@ -0,0 +1,214 @@ +import { countries } from '@/translations/countries'; +import { cn } from '@/utils/cn'; +import { ArrowDown, ArrowUp } from 'lucide-react'; +import { SerieIcon } from '../report-chart/common/serie-icon'; +import { Badge } from '../ui/badge'; + +type InsightPayload = { + metric?: 'sessions' | 'pageviews' | 'share'; + primaryDimension?: { + type: string; + displayName: string; + }; + extra?: { + currentShare?: number; + compareShare?: number; + shareShiftPp?: number; + isNew?: boolean; + isGone?: boolean; + }; +}; + +type Insight = { + id: string; + title: string; + summary: string | null; + payload: unknown; + currentValue: number | null; + compareValue: number | null; + changePct: number | null; + direction: string | null; + moduleKey: string; + dimensionKey: string; + windowKind: string; + severityBand: string | null; + impactScore?: number | null; + firstDetectedAt?: string | Date; +}; + +function formatWindowKind(windowKind: string): string { + switch (windowKind) { + case 'yesterday': + return 'Yesterday'; + case 'rolling_7d': + return '7 Days'; + case 'rolling_30d': + return '30 Days'; + } + return windowKind; +} + +interface InsightCardProps { + insight: Insight; + className?: string; +} + +export function InsightCard({ insight, className }: InsightCardProps) { + const payload = insight.payload as InsightPayload | null; + const dimension = payload?.primaryDimension; + const metric = payload?.metric ?? 'sessions'; + const extra = payload?.extra; + + // Determine if this is a share-based insight (geo, devices) + const isShareBased = metric === 'share'; + + // Get the values to display based on metric type + const currentValue = isShareBased + ? (extra?.currentShare ?? null) + : (insight.currentValue ?? null); + const compareValue = isShareBased + ? (extra?.compareShare ?? null) + : (insight.compareValue ?? null); + + // Get direction and change + const direction = insight.direction ?? 'flat'; + const isIncrease = direction === 'up'; + const isDecrease = direction === 'down'; + + // Format the delta display + const deltaText = isShareBased + ? `${Math.abs(extra?.shareShiftPp ?? 0).toFixed(1)}pp` + : `${Math.abs((insight.changePct ?? 0) * 100).toFixed(1)}%`; + + // Format metric values + const formatValue = (value: number | null): string => { + if (value == null) return '-'; + if (isShareBased) return `${(value * 100).toFixed(1)}%`; + return Math.round(value).toLocaleString(); + }; + + // Get the metric label + const metricLabel = isShareBased + ? 'Share' + : metric === 'pageviews' + ? 'Pageviews' + : 'Sessions'; + + const renderTitle = () => { + const t = insight.title.replace(/↑.*$/, '').replace(/↓.*$/, '').trim(); + if ( + dimension && + (dimension.type === 'country' || + dimension.type === 'referrer' || + dimension.type === 'device') + ) { + return ( + + {' '} + {countries[dimension.displayName as keyof typeof countries] || t} + + ); + } + + return t; + }; + + return ( +
+
+ + {formatWindowKind(insight.windowKind)} + / + {dimension?.type ?? 'unknown'} + + {/* Severity: subtle dot instead of big pill */} + {insight.severityBand && ( +
+ + + {insight.severityBand} + +
+ )} +
+
+ {renderTitle()} +
+ + {/* Metric row */} +
+
+
+
+ {metricLabel} +
+ +
+
+ {formatValue(currentValue)} +
+ + {/* Inline compare, smaller */} + {compareValue != null && ( +
+ vs {formatValue(compareValue)} +
+ )} +
+
+ + {/* Delta chip */} + +
+
+
+ ); +} + +function DeltaChip({ + isIncrease, + isDecrease, + deltaText, +}: { + isIncrease: boolean; + isDecrease: boolean; + deltaText: string; +}) { + return ( +
+ {isIncrease ? ( + + ) : isDecrease ? ( + + ) : null} + {deltaText} +
+ ); +} diff --git a/apps/start/src/components/overview/overview-insights.tsx b/apps/start/src/components/overview/overview-insights.tsx new file mode 100644 index 00000000..7121a9be --- /dev/null +++ b/apps/start/src/components/overview/overview-insights.tsx @@ -0,0 +1,66 @@ +import { useTRPC } from '@/integrations/trpc/react'; +import { useQuery } from '@tanstack/react-query'; +import { InsightCard } from '../insights/insight-card'; +import { Skeleton } from '../skeleton'; +import { + Carousel, + CarouselContent, + CarouselItem, + CarouselNext, + CarouselPrevious, +} from '../ui/carousel'; + +interface OverviewInsightsProps { + projectId: string; +} + +export default function OverviewInsights({ projectId }: OverviewInsightsProps) { + const trpc = useTRPC(); + const { data: insights, isLoading } = useQuery( + trpc.insight.list.queryOptions({ + projectId, + limit: 20, + }), + ); + + if (isLoading) { + const keys = Array.from({ length: 4 }, (_, i) => `insight-skeleton-${i}`); + return ( +
+ + + {keys.map((key) => ( + + + + ))} + + +
+ ); + } + + if (!insights || insights.length === 0) return null; + + return ( +
+ + + {insights.map((insight) => ( + + + + ))} + + + + +
+ ); +} diff --git a/apps/start/src/components/sidebar-project-menu.tsx b/apps/start/src/components/sidebar-project-menu.tsx index 80c5c8dd..e1d480fb 100644 --- a/apps/start/src/components/sidebar-project-menu.tsx +++ b/apps/start/src/components/sidebar-project-menu.tsx @@ -17,6 +17,7 @@ import { LayoutPanelTopIcon, PlusIcon, SparklesIcon, + TrendingUpDownIcon, UndoDotIcon, UsersIcon, WallpaperIcon, @@ -39,13 +40,18 @@ export default function SidebarProjectMenu({ }: SidebarProjectMenuProps) { return ( <> -
Insights
+
Analytics
+ diff --git a/apps/start/src/routeTree.gen.ts b/apps/start/src/routeTree.gen.ts index 735a873a..a75fc4b0 100644 --- a/apps/start/src/routeTree.gen.ts +++ b/apps/start/src/routeTree.gen.ts @@ -38,6 +38,7 @@ import { Route as AppOrganizationIdProjectIdReportsRouteImport } from './routes/ import { Route as AppOrganizationIdProjectIdReferencesRouteImport } from './routes/_app.$organizationId.$projectId.references' import { Route as AppOrganizationIdProjectIdRealtimeRouteImport } from './routes/_app.$organizationId.$projectId.realtime' import { Route as AppOrganizationIdProjectIdPagesRouteImport } from './routes/_app.$organizationId.$projectId.pages' +import { Route as AppOrganizationIdProjectIdInsightsRouteImport } from './routes/_app.$organizationId.$projectId.insights' import { Route as AppOrganizationIdProjectIdDashboardsRouteImport } from './routes/_app.$organizationId.$projectId.dashboards' import { Route as AppOrganizationIdProjectIdChatRouteImport } from './routes/_app.$organizationId.$projectId.chat' import { Route as AppOrganizationIdMembersTabsIndexRouteImport } from './routes/_app.$organizationId.members._tabs.index' @@ -273,6 +274,12 @@ const AppOrganizationIdProjectIdPagesRoute = path: '/pages', getParentRoute: () => AppOrganizationIdProjectIdRoute, } as any) +const AppOrganizationIdProjectIdInsightsRoute = + AppOrganizationIdProjectIdInsightsRouteImport.update({ + id: '/insights', + path: '/insights', + getParentRoute: () => AppOrganizationIdProjectIdRoute, + } as any) const AppOrganizationIdProjectIdDashboardsRoute = AppOrganizationIdProjectIdDashboardsRouteImport.update({ id: '/dashboards', @@ -495,6 +502,7 @@ export interface FileRoutesByFullPath { '/$organizationId/': typeof AppOrganizationIdIndexRoute '/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -552,6 +560,7 @@ export interface FileRoutesByTo { '/$organizationId': typeof AppOrganizationIdIndexRoute '/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -609,6 +618,7 @@ export interface FileRoutesById { '/_app/$organizationId/': typeof AppOrganizationIdIndexRoute '/_app/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/_app/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/_app/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/_app/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/_app/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/_app/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -677,6 +687,7 @@ export interface FileRouteTypes { | '/$organizationId/' | '/$organizationId/$projectId/chat' | '/$organizationId/$projectId/dashboards' + | '/$organizationId/$projectId/insights' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -734,6 +745,7 @@ export interface FileRouteTypes { | '/$organizationId' | '/$organizationId/$projectId/chat' | '/$organizationId/$projectId/dashboards' + | '/$organizationId/$projectId/insights' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -790,6 +802,7 @@ export interface FileRouteTypes { | '/_app/$organizationId/' | '/_app/$organizationId/$projectId/chat' | '/_app/$organizationId/$projectId/dashboards' + | '/_app/$organizationId/$projectId/insights' | '/_app/$organizationId/$projectId/pages' | '/_app/$organizationId/$projectId/realtime' | '/_app/$organizationId/$projectId/references' @@ -1085,6 +1098,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof AppOrganizationIdProjectIdPagesRouteImport parentRoute: typeof AppOrganizationIdProjectIdRoute } + '/_app/$organizationId/$projectId/insights': { + id: '/_app/$organizationId/$projectId/insights' + path: '/insights' + fullPath: '/$organizationId/$projectId/insights' + preLoaderRoute: typeof AppOrganizationIdProjectIdInsightsRouteImport + parentRoute: typeof AppOrganizationIdProjectIdRoute + } '/_app/$organizationId/$projectId/dashboards': { id: '/_app/$organizationId/$projectId/dashboards' path: '/dashboards' @@ -1528,6 +1548,7 @@ const AppOrganizationIdProjectIdSettingsRouteWithChildren = interface AppOrganizationIdProjectIdRouteChildren { AppOrganizationIdProjectIdChatRoute: typeof AppOrganizationIdProjectIdChatRoute AppOrganizationIdProjectIdDashboardsRoute: typeof AppOrganizationIdProjectIdDashboardsRoute + AppOrganizationIdProjectIdInsightsRoute: typeof AppOrganizationIdProjectIdInsightsRoute AppOrganizationIdProjectIdPagesRoute: typeof AppOrganizationIdProjectIdPagesRoute AppOrganizationIdProjectIdRealtimeRoute: typeof AppOrganizationIdProjectIdRealtimeRoute AppOrganizationIdProjectIdReferencesRoute: typeof AppOrganizationIdProjectIdReferencesRoute @@ -1548,6 +1569,8 @@ const AppOrganizationIdProjectIdRouteChildren: AppOrganizationIdProjectIdRouteCh AppOrganizationIdProjectIdChatRoute: AppOrganizationIdProjectIdChatRoute, AppOrganizationIdProjectIdDashboardsRoute: AppOrganizationIdProjectIdDashboardsRoute, + AppOrganizationIdProjectIdInsightsRoute: + AppOrganizationIdProjectIdInsightsRoute, AppOrganizationIdProjectIdPagesRoute: AppOrganizationIdProjectIdPagesRoute, AppOrganizationIdProjectIdRealtimeRoute: AppOrganizationIdProjectIdRealtimeRoute, diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx index 15a4c95a..ed241c53 100644 --- a/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx +++ b/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx @@ -3,6 +3,7 @@ import { OverviewFiltersButtons, } from '@/components/overview/filters/overview-filters-buttons'; import { LiveCounter } from '@/components/overview/live-counter'; +import OverviewInsights from '@/components/overview/overview-insights'; import { OverviewInterval } from '@/components/overview/overview-interval'; import OverviewMetrics from '@/components/overview/overview-metrics'; import { OverviewRange } from '@/components/overview/overview-range'; @@ -50,6 +51,7 @@ function ProjectDashboard() {
+ diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx new file mode 100644 index 00000000..cf26d933 --- /dev/null +++ b/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx @@ -0,0 +1,278 @@ +import { FullPageEmptyState } from '@/components/full-page-empty-state'; +import { InsightCard } from '@/components/insights/insight-card'; +import { PageContainer } from '@/components/page-container'; +import { PageHeader } from '@/components/page-header'; +import { Skeleton } from '@/components/skeleton'; +import { Input } from '@/components/ui/input'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { TableButtons } from '@/components/ui/table'; +import { useTRPC } from '@/integrations/trpc/react'; +import { PAGE_TITLES, createProjectTitle } from '@/utils/title'; +import { useQuery } from '@tanstack/react-query'; +import { createFileRoute } from '@tanstack/react-router'; +import { useMemo, useState } from 'react'; + +export const Route = createFileRoute( + '/_app/$organizationId/$projectId/insights', +)({ + component: Component, + head: () => { + return { + meta: [ + { + title: createProjectTitle('Insights'), + }, + ], + }; + }, +}); + +type SortOption = + | 'impact-desc' + | 'impact-asc' + | 'severity-desc' + | 'severity-asc' + | 'recent'; + +function Component() { + const { projectId } = Route.useParams(); + const trpc = useTRPC(); + const { data: insights, isLoading } = useQuery( + trpc.insight.listAll.queryOptions({ + projectId, + limit: 500, + }), + ); + + const [search, setSearch] = useState(''); + const [moduleFilter, setModuleFilter] = useState('all'); + const [windowKindFilter, setWindowKindFilter] = useState('all'); + const [severityFilter, setSeverityFilter] = useState('all'); + const [directionFilter, setDirectionFilter] = useState('all'); + const [sortBy, setSortBy] = useState('impact-desc'); + + const filteredAndSorted = useMemo(() => { + if (!insights) return []; + + const filtered = insights.filter((insight) => { + // Search filter + if (search) { + const searchLower = search.toLowerCase(); + const matchesTitle = insight.title.toLowerCase().includes(searchLower); + const matchesSummary = insight.summary + ?.toLowerCase() + .includes(searchLower); + const matchesDimension = insight.dimensionKey + .toLowerCase() + .includes(searchLower); + if (!matchesTitle && !matchesSummary && !matchesDimension) { + return false; + } + } + + // Module filter + if (moduleFilter !== 'all' && insight.moduleKey !== moduleFilter) { + return false; + } + + // Window kind filter + if ( + windowKindFilter !== 'all' && + insight.windowKind !== windowKindFilter + ) { + return false; + } + + // Severity filter + if (severityFilter !== 'all') { + if (severityFilter === 'none' && insight.severityBand) return false; + if ( + severityFilter !== 'none' && + insight.severityBand !== severityFilter + ) + return false; + } + + // Direction filter + if (directionFilter !== 'all' && insight.direction !== directionFilter) { + return false; + } + + return true; + }); + + // Sort (create new array to avoid mutation) + const sorted = [...filtered].sort((a, b) => { + switch (sortBy) { + case 'impact-desc': + return (b.impactScore ?? 0) - (a.impactScore ?? 0); + case 'impact-asc': + return (a.impactScore ?? 0) - (b.impactScore ?? 0); + case 'severity-desc': { + const severityOrder: Record = { + severe: 3, + moderate: 2, + low: 1, + }; + const aSev = severityOrder[a.severityBand ?? ''] ?? 0; + const bSev = severityOrder[b.severityBand ?? ''] ?? 0; + return bSev - aSev; + } + case 'severity-asc': { + const severityOrder: Record = { + severe: 3, + moderate: 2, + low: 1, + }; + const aSev = severityOrder[a.severityBand ?? ''] ?? 0; + const bSev = severityOrder[b.severityBand ?? ''] ?? 0; + return aSev - bSev; + } + case 'recent': + return ( + new Date(b.firstDetectedAt ?? 0).getTime() - + new Date(a.firstDetectedAt ?? 0).getTime() + ); + default: + return 0; + } + }); + + return sorted; + }, [ + insights, + search, + moduleFilter, + windowKindFilter, + severityFilter, + directionFilter, + sortBy, + ]); + + const uniqueModules = useMemo(() => { + if (!insights) return []; + return Array.from(new Set(insights.map((i) => i.moduleKey))).sort(); + }, [insights]); + + if (isLoading) { + return ( + + +
+ {Array.from({ length: 8 }, (_, i) => `skeleton-${i}`).map((key) => ( + + ))} +
+
+ ); + } + + return ( + + + + setSearch(e.target.value)} + className="max-w-xs" + /> + + + + + + + + {filteredAndSorted.length === 0 && !isLoading && ( + + )} + +
+ {filteredAndSorted.map((insight) => ( + + ))} +
+ + {filteredAndSorted.length > 0 && ( +
+ Showing {filteredAndSorted.length} of {insights?.length ?? 0} insights +
+ )} +
+ ); +} diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 9650598e..8e5b30fd 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -44,6 +44,12 @@ export async function bootCron() { }); } + jobs.push({ + name: 'insightsDaily', + type: 'insightsDaily', + pattern: '0 2 * * *', // 2 AM daily + }); + logger.info('Updating cron jobs'); const jobSchedulers = await cronQueue.getJobSchedulers(); diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 55652de8..4b739afc 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -7,6 +7,7 @@ import { cronQueue, eventsGroupQueues, importQueue, + insightsQueue, miscQueue, notificationQueue, queueLogger, @@ -21,6 +22,7 @@ import { Worker as GroupWorker } from 'groupmq'; import { cronJob } from './jobs/cron'; import { incomingEvent } from './jobs/events.incoming-event'; import { importJob } from './jobs/import'; +import { insightsProjectJob } from './jobs/insights'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; @@ -49,7 +51,15 @@ function getEnabledQueues(): QueueName[] { logger.info('No ENABLED_QUEUES specified, starting all queues', { totalEventShards: EVENTS_GROUP_QUEUES_SHARDS, }); - return ['events', 'sessions', 'cron', 'notification', 'misc', 'import']; + return [ + 'events', + 'sessions', + 'cron', + 'notification', + 'misc', + 'import', + 'insights', + ]; } const queues = enabledQueuesEnv @@ -187,6 +197,17 @@ export async function bootWorkers() { logger.info('Started worker for import', { concurrency }); } + // Start insights worker + if (enabledQueues.includes('insights')) { + const concurrency = getConcurrencyFor('insights', 5); + const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, { + ...workerOptions, + concurrency, + }); + workers.push(insightsWorker); + logger.info('Started worker for insights', { concurrency }); + } + if (workers.length === 0) { logger.warn( 'No workers started. Check ENABLED_QUEUES environment variable.', diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 7a4686ab..1109e6e0 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -6,6 +6,7 @@ import { cronQueue, eventsGroupQueues, importQueue, + insightsQueue, miscQueue, notificationQueue, sessionsQueue, @@ -13,10 +14,13 @@ import { import express from 'express'; import client from 'prom-client'; +import { getRedisQueue } from '@openpanel/redis'; +import { Worker } from 'bullmq'; import { BullBoardGroupMQAdapter } from 'groupmq'; import sourceMapSupport from 'source-map-support'; import { bootCron } from './boot-cron'; import { bootWorkers } from './boot-workers'; +import { insightsProjectJob } from './jobs/insights'; import { register } from './metrics'; import { logger } from './utils/logger'; @@ -42,6 +46,7 @@ async function start() { new BullMQAdapter(notificationQueue), new BullMQAdapter(miscQueue), new BullMQAdapter(importQueue), + new BullMQAdapter(insightsQueue), ], serverAdapter: serverAdapter, }); @@ -74,6 +79,11 @@ async function start() { await bootCron(); } else { logger.warn('Workers are disabled'); + + // Start insights worker + const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, { + connection: getRedisQueue(), + }); } await createInitialSalts(); diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index b50e3beb..eee51b16 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -6,6 +6,7 @@ import type { CronQueuePayload } from '@openpanel/queue'; import { jobdeleteProjects } from './cron.delete-projects'; import { ping } from './cron.ping'; import { salt } from './cron.salt'; +import { insightsDailyJob } from './insights'; export async function cronJob(job: Job) { switch (job.data.type) { @@ -27,5 +28,8 @@ export async function cronJob(job: Job) { case 'deleteProjects': { return await jobdeleteProjects(job); } + case 'insightsDaily': { + return await insightsDailyJob(job); + } } } diff --git a/apps/worker/src/jobs/insights.ts b/apps/worker/src/jobs/insights.ts new file mode 100644 index 00000000..b418dd69 --- /dev/null +++ b/apps/worker/src/jobs/insights.ts @@ -0,0 +1,71 @@ +import { ch } from '@openpanel/db/src/clickhouse/client'; +import { + createEngine, + devicesModule, + entryPagesModule, + geoModule, + insightStore, + pageTrendsModule, + referrersModule, +} from '@openpanel/db/src/services/insights'; +import type { + CronQueuePayload, + InsightsQueuePayloadProject, +} from '@openpanel/queue'; +import { insightsQueue } from '@openpanel/queue'; +import type { Job } from 'bullmq'; + +const defaultEngineConfig = { + keepTopNPerModuleWindow: 5, + closeStaleAfterDays: 7, + dimensionBatchSize: 50, + globalThresholds: { + minTotal: 200, + minAbsDelta: 80, + minPct: 0.15, + }, + enableExplain: false, + explainTopNPerProjectPerDay: 3, +}; + +export async function insightsDailyJob(job: Job) { + const projectIds = await insightStore.listProjectIdsForCadence('daily'); + const date = new Date().toISOString().slice(0, 10); + + for (const projectId of projectIds) { + await insightsQueue.add( + 'insightsProject', + { + type: 'insightsProject', + payload: { projectId, date }, + }, + { + jobId: `daily:${date}:${projectId}`, // idempotent + }, + ); + } +} + +export async function insightsProjectJob( + job: Job, +) { + const { projectId, date } = job.data.payload; + const engine = createEngine({ + store: insightStore, + modules: [ + referrersModule, + entryPagesModule, + pageTrendsModule, + geoModule, + devicesModule, + ], + db: ch, + config: defaultEngineConfig, + }); + + await engine.runProject({ + projectId, + cadence: 'daily', + now: new Date(date), + }); +} diff --git a/packages/db/index.ts b/packages/db/index.ts index 58042d3f..aaa9b5a7 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -28,4 +28,5 @@ export * from './src/types'; export * from './src/clickhouse/query-builder'; export * from './src/services/import.service'; export * from './src/services/overview.service'; +export * from './src/services/insights'; export * from './src/session-context'; diff --git a/packages/db/prisma/migrations/20251212192459_insights/migration.sql b/packages/db/prisma/migrations/20251212192459_insights/migration.sql new file mode 100644 index 00000000..859dbcd1 --- /dev/null +++ b/packages/db/prisma/migrations/20251212192459_insights/migration.sql @@ -0,0 +1,57 @@ +-- CreateEnum +CREATE TYPE "public"."InsightState" AS ENUM ('active', 'suppressed', 'closed'); + +-- CreateTable +CREATE TABLE "public"."project_insights" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "projectId" TEXT NOT NULL, + "moduleKey" TEXT NOT NULL, + "dimensionKey" TEXT NOT NULL, + "windowKind" TEXT NOT NULL, + "state" "public"."InsightState" NOT NULL DEFAULT 'active', + "title" TEXT NOT NULL, + "summary" TEXT, + "payload" JSONB, + "currentValue" DOUBLE PRECISION, + "compareValue" DOUBLE PRECISION, + "changePct" DOUBLE PRECISION, + "direction" TEXT, + "impactScore" DOUBLE PRECISION NOT NULL DEFAULT 0, + "severityBand" TEXT, + "version" INTEGER NOT NULL DEFAULT 1, + "threadId" UUID NOT NULL DEFAULT gen_random_uuid(), + "windowStart" TIMESTAMP(3), + "windowEnd" TIMESTAMP(3), + "firstDetectedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastUpdatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "project_insights_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."insight_events" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "insightId" UUID NOT NULL, + "eventKind" TEXT NOT NULL, + "changeFrom" JSONB, + "changeTo" JSONB, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "insight_events_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "project_insights_projectId_impactScore_idx" ON "public"."project_insights"("projectId", "impactScore" DESC); + +-- CreateIndex +CREATE INDEX "project_insights_projectId_moduleKey_windowKind_state_idx" ON "public"."project_insights"("projectId", "moduleKey", "windowKind", "state"); + +-- CreateIndex +CREATE UNIQUE INDEX "project_insights_projectId_moduleKey_dimensionKey_windowKin_key" ON "public"."project_insights"("projectId", "moduleKey", "dimensionKey", "windowKind", "state"); + +-- CreateIndex +CREATE INDEX "insight_events_insightId_createdAt_idx" ON "public"."insight_events"("insightId", "createdAt"); + +-- AddForeignKey +ALTER TABLE "public"."insight_events" ADD CONSTRAINT "insight_events_insightId_fkey" FOREIGN KEY ("insightId") REFERENCES "public"."project_insights"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 38a29105..030cc84c 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -497,3 +497,59 @@ model Import { @@map("imports") } + +enum InsightState { + active + suppressed + closed +} + +model ProjectInsight { + id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid + projectId String + moduleKey String // e.g. "referrers", "entry-pages" + dimensionKey String // e.g. "referrer:instagram", "page:/pricing" + windowKind String // "yesterday" | "rolling_7d" | "rolling_30d" + state InsightState @default(active) + + title String + summary String? + payload Json? // RenderedCard blocks, extra data + + currentValue Float? + compareValue Float? + changePct Float? + direction String? // "up" | "down" | "flat" + impactScore Float @default(0) + severityBand String? // "low" | "moderate" | "severe" + + version Int @default(1) + threadId String @default(dbgenerated("gen_random_uuid()")) @db.Uuid + + windowStart DateTime? + windowEnd DateTime? + + firstDetectedAt DateTime @default(now()) + lastUpdatedAt DateTime @default(now()) @updatedAt + lastSeenAt DateTime @default(now()) + + events InsightEvent[] + + @@unique([projectId, moduleKey, dimensionKey, windowKind, state]) + @@index([projectId, impactScore(sort: Desc)]) + @@index([projectId, moduleKey, windowKind, state]) + @@map("project_insights") +} + +model InsightEvent { + id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid + insightId String @db.Uuid + insight ProjectInsight @relation(fields: [insightId], references: [id], onDelete: Cascade) + eventKind String // "created" | "updated" | "severity_up" | "direction_flip" | "closed" | etc + changeFrom Json? + changeTo Json? + createdAt DateTime @default(now()) + + @@index([insightId, createdAt]) + @@map("insight_events") +} diff --git a/packages/db/src/prisma-client.ts b/packages/db/src/prisma-client.ts index e5900dd6..befe3f74 100644 --- a/packages/db/src/prisma-client.ts +++ b/packages/db/src/prisma-client.ts @@ -42,11 +42,11 @@ const getPrismaClient = () => { operation === 'update' || operation === 'delete' ) { - logger.info('Prisma operation', { - operation, - args, - model, - }); + // logger.info('Prisma operation', { + // operation, + // args, + // model, + // }); } return query(args); }, diff --git a/packages/db/src/services/insights/engine.ts b/packages/db/src/services/insights/engine.ts new file mode 100644 index 00000000..7c0944ba --- /dev/null +++ b/packages/db/src/services/insights/engine.ts @@ -0,0 +1,346 @@ +import crypto from 'node:crypto'; +import { materialDecision } from './material'; +import { defaultImpactScore, severityBand } from './scoring'; +import type { + Cadence, + ComputeContext, + ComputeResult, + ExplainQueue, + InsightModule, + InsightStore, + WindowKind, +} from './types'; +import { resolveWindow } from './windows'; + +export interface EngineConfig { + keepTopNPerModuleWindow: number; // e.g. 5 + closeStaleAfterDays: number; // e.g. 7 + dimensionBatchSize: number; // e.g. 50 + globalThresholds: { + minTotal: number; // e.g. 200 + minAbsDelta: number; // e.g. 80 + minPct: number; // e.g. 0.15 + }; + enableExplain: boolean; + explainTopNPerProjectPerDay: number; // e.g. 3 +} + +/** Simple gating to cut noise; modules can override via thresholds. */ +function passesThresholds( + r: ComputeResult, + mod: InsightModule, + cfg: EngineConfig, +): boolean { + const t = mod.thresholds ?? {}; + const minTotal = t.minTotal ?? cfg.globalThresholds.minTotal; + const minAbsDelta = t.minAbsDelta ?? cfg.globalThresholds.minAbsDelta; + const minPct = t.minPct ?? cfg.globalThresholds.minPct; + const cur = r.currentValue ?? 0; + const cmp = r.compareValue ?? 0; + const total = cur + cmp; + const absDelta = Math.abs(cur - cmp); + const pct = Math.abs(r.changePct ?? 0); + if (total < minTotal) return false; + if (absDelta < minAbsDelta) return false; + if (pct < minPct) return false; + return true; +} + +function chunk(arr: T[], size: number): T[][] { + if (size <= 0) return [arr]; + const out: T[][] = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; +} + +function sha256(x: string) { + return crypto.createHash('sha256').update(x).digest('hex'); +} + +/** + * Engine entrypoint: runs all projects for a cadence. + * Recommended: call this from a per-project worker (fanout), but it can also run directly. + */ +export function createEngine(args: { + store: InsightStore; + modules: InsightModule[]; + db: any; + logger?: Pick; + explainQueue?: ExplainQueue; + config: EngineConfig; +}) { + const { store, modules, db, explainQueue, config } = args; + const logger = args.logger ?? console; + + async function runCadence(cadence: Cadence, now: Date): Promise { + const projectIds = await store.listProjectIdsForCadence(cadence); + for (const projectId of projectIds) { + await runProject({ projectId, cadence, now }); + } + } + + async function runProject(opts: { + projectId: string; + cadence: Cadence; + now: Date; + }): Promise { + const { projectId, cadence, now } = opts; + const projLogger = logger; + const eligible = modules.filter((m) => m.cadence.includes(cadence)); + + // Track top insights (by impact) for optional explain step across all modules/windows + const explainCandidates: Array<{ + insightId: string; + impact: number; + evidence: any; + evidenceHash: string; + }> = []; + + for (const mod of eligible) { + for (const windowKind of mod.windows) { + const window = resolveWindow(windowKind as WindowKind, now); + const ctx: ComputeContext = { + projectId, + window, + db, + now, + logger: projLogger, + }; + + // 1) enumerate dimensions + let dims = mod.enumerateDimensions + ? await mod.enumerateDimensions(ctx) + : []; + const maxDims = mod.thresholds?.maxDims ?? 25; + if (dims.length > maxDims) dims = dims.slice(0, maxDims); + + if (dims.length === 0) { + // Still do lifecycle close / suppression based on "nothing emitted" + await store.closeMissingActiveInsights({ + projectId, + moduleKey: mod.key, + windowKind, + seenDimensionKeys: [], + now, + staleDays: config.closeStaleAfterDays, + }); + + await store.applySuppression({ + projectId, + moduleKey: mod.key, + windowKind, + keepTopN: config.keepTopNPerModuleWindow, + now, + }); + + continue; + } + + // 2) compute in batches + const seen: string[] = []; + const dimBatches = chunk(dims, config.dimensionBatchSize); + for (const batch of dimBatches) { + let results: ComputeResult[] = []; + try { + results = await mod.computeMany(ctx, batch); + } catch (e) { + projLogger.error('[insights] module computeMany failed', { + projectId, + module: mod.key, + windowKind, + err: e, + }); + continue; + } + + for (const r of results) { + if (!r?.ok) continue; + if (!r.dimensionKey) continue; + + // 3) gate noise + if (!passesThresholds(r, mod, config)) continue; + + // 4) score + const impact = mod.score + ? mod.score(r, ctx) + : defaultImpactScore(r); + const sev = severityBand(r.changePct); + + // 5) dedupe/material change requires loading prev identity + const prev = await store.getActiveInsightByIdentity({ + projectId, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + }); + + const decision = materialDecision(prev, { + changePct: r.changePct, + direction: r.direction, + }); + + // 6) render + const card = mod.render(r, ctx); + + // 7) upsert + const persisted = await store.upsertInsight({ + projectId, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + window, + card, + metrics: { + currentValue: r.currentValue, + compareValue: r.compareValue, + changePct: r.changePct, + direction: r.direction, + impactScore: impact, + severityBand: sev, + }, + now, + decision, + prev, + }); + + seen.push(r.dimensionKey); + + // 8) events only when material + if (!prev) { + await store.insertEvent({ + projectId, + insightId: persisted.id, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + eventKind: 'created', + changeFrom: null, + changeTo: { + title: card.title, + changePct: r.changePct, + direction: r.direction, + impact, + severityBand: sev, + }, + now, + }); + } else if (decision.material) { + const eventKind = + decision.reason === 'direction_flip' + ? 'direction_flip' + : decision.reason === 'severity_change' + ? sev && prev.severityBand && sev > prev.severityBand + ? 'severity_up' + : 'severity_down' + : 'updated'; + + await store.insertEvent({ + projectId, + insightId: persisted.id, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + eventKind, + changeFrom: { + changePct: prev.changePct, + direction: prev.direction, + impactScore: prev.impactScore, + severityBand: prev.severityBand, + }, + changeTo: { + changePct: r.changePct, + direction: r.direction, + impactScore: impact, + severityBand: sev, + }, + now, + }); + } + + // 9) optional AI explain candidates (only for top-impact insights) + if (config.enableExplain && explainQueue && mod.drivers) { + // compute evidence deterministically (drivers) + try { + const drivers = await mod.drivers(r, ctx); + const evidence = { + insight: { + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + currentValue: r.currentValue, + compareValue: r.compareValue, + changePct: r.changePct, + direction: r.direction, + }, + drivers, + window: { + start: window.start.toISOString().slice(0, 10), + end: window.end.toISOString().slice(0, 10), + baselineStart: window.baselineStart + .toISOString() + .slice(0, 10), + baselineEnd: window.baselineEnd.toISOString().slice(0, 10), + }, + }; + const evidenceHash = sha256(JSON.stringify(evidence)); + explainCandidates.push({ + insightId: persisted.id, + impact, + evidence, + evidenceHash, + }); + } catch (e) { + projLogger.warn('[insights] drivers() failed', { + projectId, + module: mod.key, + dimensionKey: r.dimensionKey, + err: e, + }); + } + } + } + } + + // 10) lifecycle: close missing insights for this module/window + await store.closeMissingActiveInsights({ + projectId, + moduleKey: mod.key, + windowKind, + seenDimensionKeys: seen, + now, + staleDays: config.closeStaleAfterDays, + }); + + // 11) suppression: keep top N + await store.applySuppression({ + projectId, + moduleKey: mod.key, + windowKind, + keepTopN: config.keepTopNPerModuleWindow, + now, + }); + } + } + + // 12) enqueue explains for top insights across the whole project run + if (config.enableExplain && explainQueue) { + explainCandidates.sort((a, b) => b.impact - a.impact); + const top = explainCandidates.slice( + 0, + config.explainTopNPerProjectPerDay, + ); + for (const c of top) { + await explainQueue.enqueueExplain({ + insightId: c.insightId, + projectId, + moduleKey: 'n/a', // optional; you can include it in evidence instead + dimensionKey: 'n/a', + windowKind: 'yesterday', + evidence: c.evidence, + evidenceHash: c.evidenceHash, + }); + } + } + } + + return { runCadence, runProject }; +} diff --git a/packages/db/src/services/insights/index.ts b/packages/db/src/services/insights/index.ts new file mode 100644 index 00000000..5e0740c1 --- /dev/null +++ b/packages/db/src/services/insights/index.ts @@ -0,0 +1,9 @@ +export * from './types'; +export * from './windows'; +export * from './scoring'; +export * from './material'; +export * from './engine'; +export * from './store'; +export * from './normalize'; +export * from './utils'; +export * from './modules'; diff --git a/packages/db/src/services/insights/material.ts b/packages/db/src/services/insights/material.ts new file mode 100644 index 00000000..13699b31 --- /dev/null +++ b/packages/db/src/services/insights/material.ts @@ -0,0 +1,43 @@ +import { severityBand as band } from './scoring'; +import type { MaterialDecision, PersistedInsight } from './types'; + +export function materialDecision( + prev: PersistedInsight | null, + next: { + changePct?: number; + direction?: 'up' | 'down' | 'flat'; + }, +): MaterialDecision { + const nextBand = band(next.changePct); + if (!prev) { + return { material: true, reason: 'created', newSeverityBand: nextBand }; + } + + // direction flip is always meaningful + const prevDir = (prev.direction ?? 'flat') as any; + const nextDir = next.direction ?? 'flat'; + if (prevDir !== nextDir && (nextDir === 'up' || nextDir === 'down')) { + return { + material: true, + reason: 'direction_flip', + newSeverityBand: nextBand, + }; + } + + // severity band change + const prevBand = (prev.severityBand ?? null) as any; + if (prevBand !== nextBand && nextBand !== null) { + return { + material: true, + reason: 'severity_change', + newSeverityBand: nextBand, + }; + } + + // Otherwise: treat as non-material (silent refresh). You can add deadband crossing here if you store prior changePct. + return { + material: false, + reason: 'none', + newSeverityBand: prevBand ?? nextBand, + }; +} diff --git a/packages/db/src/services/insights/modules/devices.module.ts b/packages/db/src/services/insights/modules/devices.module.ts new file mode 100644 index 00000000..1a3601fb --- /dev/null +++ b/packages/db/src/services/insights/modules/devices.module.ts @@ -0,0 +1,223 @@ +import { TABLE_NAMES } from '../../../clickhouse/client'; +import { clix } from '../../../clickhouse/query-builder'; +import type { ComputeResult, InsightModule, RenderedCard } from '../types'; +import { computeWeekdayMedians, getEndOfDay, getWeekday } from '../utils'; + +function normalizeDevice(device: string): string { + const d = (device || '').toLowerCase().trim(); + if (d.includes('mobile') || d === 'phone') return 'mobile'; + if (d.includes('tablet')) return 'tablet'; + if (d.includes('desktop')) return 'desktop'; + return d || 'unknown'; +} + +export const devicesModule: InsightModule = { + key: 'devices', + cadence: ['daily'], + windows: ['yesterday', 'rolling_7d', 'rolling_30d'], + thresholds: { minTotal: 100, minAbsDelta: 0, minPct: 0.08, maxDims: 5 }, + + async enumerateDimensions(ctx) { + // Query devices from current window (limited set, no need for baseline merge) + const results = await clix(ctx.db) + .select<{ device: string; cnt: number }>(['device', 'count(*) as cnt']) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .where('device', '!=', '') + .groupBy(['device']) + .orderBy('cnt', 'DESC') + .execute(); + + // Normalize and dedupe device types + const dims = new Set(); + for (const r of results) { + dims.add(`device:${normalizeDevice(r.device)}`); + } + + return Array.from(dims); + }, + + async computeMany(ctx, dimensionKeys): Promise { + // Single query for ALL current values + const currentResults = await clix(ctx.db) + .select<{ device: string; cnt: number }>(['device', 'count(*) as cnt']) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['device']) + .execute(); + + // Build current lookup map (normalized) and total + const currentMap = new Map(); + let totalCurrentValue = 0; + for (const r of currentResults) { + const key = normalizeDevice(r.device); + const cnt = Number(r.cnt ?? 0); + currentMap.set(key, (currentMap.get(key) ?? 0) + cnt); + totalCurrentValue += cnt; + } + + // Single query for baseline + let baselineMap: Map; + let totalBaselineValue = 0; + + if (ctx.window.kind === 'yesterday') { + const baselineResults = await clix(ctx.db) + .select<{ date: string; device: string; cnt: number }>([ + 'toDate(created_at) as date', + 'device', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'device']) + .execute(); + + const targetWeekday = getWeekday(ctx.window.start); + + // Group by normalized device type before computing medians + const normalizedResults = baselineResults.map((r) => ({ + date: r.date, + device: normalizeDevice(r.device), + cnt: r.cnt, + })); + + // Aggregate by date + normalized device first + const aggregated = new Map(); + for (const r of normalizedResults) { + const key = `${r.date}|${r.device}`; + if (!aggregated.has(r.device)) { + aggregated.set(r.device, []); + } + // Find existing entry for this date+device or add new + const entries = aggregated.get(r.device)!; + const existing = entries.find((e) => e.date === r.date); + if (existing) { + existing.cnt += Number(r.cnt ?? 0); + } else { + entries.push({ date: r.date, cnt: Number(r.cnt ?? 0) }); + } + } + + // Compute weekday medians per device type + baselineMap = new Map(); + for (const [deviceType, entries] of aggregated) { + const sameWeekdayValues = entries + .filter((e) => getWeekday(new Date(e.date)) === targetWeekday) + .map((e) => e.cnt) + .sort((a, b) => a - b); + + if (sameWeekdayValues.length > 0) { + const mid = Math.floor(sameWeekdayValues.length / 2); + const median = + sameWeekdayValues.length % 2 === 0 + ? ((sameWeekdayValues[mid - 1] ?? 0) + + (sameWeekdayValues[mid] ?? 0)) / + 2 + : (sameWeekdayValues[mid] ?? 0); + baselineMap.set(deviceType, median); + totalBaselineValue += median; + } + } + } else { + const baselineResults = await clix(ctx.db) + .select<{ device: string; cnt: number }>(['device', 'count(*) as cnt']) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['device']) + .execute(); + + baselineMap = new Map(); + for (const r of baselineResults) { + const key = normalizeDevice(r.device); + const cnt = Number(r.cnt ?? 0); + baselineMap.set(key, (baselineMap.get(key) ?? 0) + cnt); + totalBaselineValue += cnt; + } + } + + // Build results from maps + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('device:')) continue; + const deviceType = dimKey.replace('device:', ''); + + const currentValue = currentMap.get(deviceType) ?? 0; + const compareValue = baselineMap.get(deviceType) ?? 0; + + const currentShare = + totalCurrentValue > 0 ? currentValue / totalCurrentValue : 0; + const compareShare = + totalBaselineValue > 0 ? compareValue / totalBaselineValue : 0; + + // Share shift in percentage points + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = + compareShare > 0 + ? (currentShare - compareShare) / compareShare + : currentShare > 0 + ? 1 + : 0; + + // Direction should match the sign of the pp shift (so title + delta agree) + const direction: 'up' | 'down' | 'flat' = + shareShiftPp > 0 ? 'up' : shareShiftPp < 0 ? 'down' : 'flat'; + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const device = result.dimensionKey.replace('device:', ''); + const shareShiftPp = (result.extra?.shareShiftPp as number) ?? 0; + const isIncrease = shareShiftPp >= 0; + + return { + kind: 'insight_v1', + title: `${device} ${isIncrease ? '↑' : '↓'} ${Math.abs(shareShiftPp).toFixed(1)}pp`, + summary: `${ctx.window.label}. Device share shift.`, + primaryDimension: { type: 'device', key: device, displayName: device }, + tags: ['devices', ctx.window.kind, isIncrease ? 'increase' : 'decrease'], + metric: 'share', + extra: { + currentShare: result.extra?.currentShare, + compareShare: result.extra?.compareShare, + shareShiftPp: result.extra?.shareShiftPp, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/entry-pages.module.ts b/packages/db/src/services/insights/modules/entry-pages.module.ts new file mode 100644 index 00000000..76742f76 --- /dev/null +++ b/packages/db/src/services/insights/modules/entry-pages.module.ts @@ -0,0 +1,193 @@ +import { TABLE_NAMES } from '../../../clickhouse/client'; +import { clix } from '../../../clickhouse/query-builder'; +import { normalizePath } from '../normalize'; +import type { ComputeResult, InsightModule, RenderedCard } from '../types'; +import { + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, +} from '../utils'; + +export const entryPagesModule: InsightModule = { + key: 'entry-pages', + cadence: ['daily'], + windows: ['yesterday', 'rolling_7d', 'rolling_30d'], + thresholds: { minTotal: 100, minAbsDelta: 30, minPct: 0.2, maxDims: 100 }, + + async enumerateDimensions(ctx) { + // Query top entry pages from BOTH current and baseline windows + const [currentResults, baselineResults] = await Promise.all([ + clix(ctx.db) + .select<{ entry_path: string; cnt: number }>([ + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['entry_path']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 100) + .execute(), + clix(ctx.db) + .select<{ entry_path: string; cnt: number }>([ + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['entry_path']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 100) + .execute(), + ]); + + // Merge both sets + const dims = new Set(); + for (const r of currentResults) { + dims.add(`entry:${normalizePath(r.entry_path || '/')}`); + } + for (const r of baselineResults) { + dims.add(`entry:${normalizePath(r.entry_path || '/')}`); + } + + return Array.from(dims); + }, + + async computeMany(ctx, dimensionKeys): Promise { + // Single query for ALL current values + const currentResults = await clix(ctx.db) + .select<{ entry_path: string; cnt: number }>([ + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['entry_path']) + .execute(); + + // Build current lookup map + const currentMap = new Map(); + for (const r of currentResults) { + const key = normalizePath(r.entry_path || '/'); + currentMap.set(key, (currentMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + + // Single query for baseline + let baselineMap: Map; + + if (ctx.window.kind === 'yesterday') { + const baselineResults = await clix(ctx.db) + .select<{ date: string; entry_path: string; cnt: number }>([ + 'toDate(created_at) as date', + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'entry_path']) + .execute(); + + const targetWeekday = getWeekday(ctx.window.start); + baselineMap = computeWeekdayMedians(baselineResults, targetWeekday, (r) => + normalizePath(r.entry_path || '/'), + ); + } else { + const baselineResults = await clix(ctx.db) + .select<{ entry_path: string; cnt: number }>([ + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['entry_path']) + .execute(); + + baselineMap = new Map(); + for (const r of baselineResults) { + const key = normalizePath(r.entry_path || '/'); + baselineMap.set(key, (baselineMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + } + + // Build results from maps + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('entry:')) continue; + const entryPath = dimKey.replace('entry:', ''); + + const currentValue = currentMap.get(entryPath) ?? 0; + const compareValue = baselineMap.get(entryPath) ?? 0; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + isNew: compareValue === 0 && currentValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const path = result.dimensionKey.replace('entry:', ''); + const pct = ((result.changePct ?? 0) * 100).toFixed(1); + const isIncrease = (result.changePct ?? 0) >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New entry page: ${path}` + : `Entry page ${path} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`; + + return { + kind: 'insight_v1', + title, + summary: `${ctx.window.label}. Sessions ${result.currentValue ?? 0} vs ${result.compareValue ?? 0}.`, + primaryDimension: { type: 'entry', key: path, displayName: path }, + tags: [ + 'entry-pages', + ctx.window.kind, + isNew ? 'new' : isIncrease ? 'increase' : 'decrease', + ], + metric: 'sessions', + extra: { + isNew: result.extra?.isNew, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/geo.module.ts b/packages/db/src/services/insights/modules/geo.module.ts new file mode 100644 index 00000000..d0ba3664 --- /dev/null +++ b/packages/db/src/services/insights/modules/geo.module.ts @@ -0,0 +1,220 @@ +import { TABLE_NAMES } from '../../../clickhouse/client'; +import { clix } from '../../../clickhouse/query-builder'; +import type { ComputeResult, InsightModule, RenderedCard } from '../types'; +import { computeWeekdayMedians, getEndOfDay, getWeekday } from '../utils'; + +export const geoModule: InsightModule = { + key: 'geo', + cadence: ['daily'], + windows: ['yesterday', 'rolling_7d', 'rolling_30d'], + thresholds: { minTotal: 100, minAbsDelta: 0, minPct: 0.08, maxDims: 30 }, + + async enumerateDimensions(ctx) { + // Query top countries from BOTH current and baseline windows + const [currentResults, baselineResults] = await Promise.all([ + clix(ctx.db) + .select<{ country: string; cnt: number }>([ + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .where('country', '!=', '') + .groupBy(['country']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 30) + .execute(), + clix(ctx.db) + .select<{ country: string; cnt: number }>([ + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .where('country', '!=', '') + .groupBy(['country']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 30) + .execute(), + ]); + + // Merge both sets + const dims = new Set(); + for (const r of currentResults) { + dims.add(`country:${r.country || 'unknown'}`); + } + for (const r of baselineResults) { + dims.add(`country:${r.country || 'unknown'}`); + } + + return Array.from(dims); + }, + + async computeMany(ctx, dimensionKeys): Promise { + // Single query for ALL current values + total + const currentResults = await clix(ctx.db) + .select<{ country: string; cnt: number }>(['country', 'count(*) as cnt']) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['country']) + .execute(); + + // Build current lookup map and total + const currentMap = new Map(); + let totalCurrentValue = 0; + for (const r of currentResults) { + const key = r.country || 'unknown'; + const cnt = Number(r.cnt ?? 0); + currentMap.set(key, (currentMap.get(key) ?? 0) + cnt); + totalCurrentValue += cnt; + } + + // Single query for baseline + let baselineMap: Map; + let totalBaselineValue = 0; + + if (ctx.window.kind === 'yesterday') { + const baselineResults = await clix(ctx.db) + .select<{ date: string; country: string; cnt: number }>([ + 'toDate(created_at) as date', + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'country']) + .execute(); + + const targetWeekday = getWeekday(ctx.window.start); + baselineMap = computeWeekdayMedians( + baselineResults, + targetWeekday, + (r) => r.country || 'unknown', + ); + + // Compute total baseline from medians + for (const value of baselineMap.values()) { + totalBaselineValue += value; + } + } else { + const baselineResults = await clix(ctx.db) + .select<{ country: string; cnt: number }>([ + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['country']) + .execute(); + + baselineMap = new Map(); + for (const r of baselineResults) { + const key = r.country || 'unknown'; + const cnt = Number(r.cnt ?? 0); + baselineMap.set(key, (baselineMap.get(key) ?? 0) + cnt); + totalBaselineValue += cnt; + } + } + + // Build results from maps + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('country:')) continue; + const country = dimKey.replace('country:', ''); + + const currentValue = currentMap.get(country) ?? 0; + const compareValue = baselineMap.get(country) ?? 0; + + const currentShare = + totalCurrentValue > 0 ? currentValue / totalCurrentValue : 0; + const compareShare = + totalBaselineValue > 0 ? compareValue / totalBaselineValue : 0; + + // Share shift in percentage points + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = + compareShare > 0 + ? (currentShare - compareShare) / compareShare + : currentShare > 0 + ? 1 + : 0; + + // Direction should match the sign of the pp shift (so title + delta agree) + const direction: 'up' | 'down' | 'flat' = + shareShiftPp > 0 ? 'up' : shareShiftPp < 0 ? 'down' : 'flat'; + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + isNew: compareValue === 0 && currentValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const country = result.dimensionKey.replace('country:', ''); + const shareShiftPp = (result.extra?.shareShiftPp as number) ?? 0; + const isIncrease = shareShiftPp >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New traffic from: ${country}` + : `${country} ${isIncrease ? '↑' : '↓'} ${Math.abs(shareShiftPp).toFixed(1)}pp`; + + return { + kind: 'insight_v1', + title, + summary: `${ctx.window.label}. Share shift from ${country}.`, + primaryDimension: { type: 'country', key: country, displayName: country }, + tags: [ + 'geo', + ctx.window.kind, + isNew ? 'new' : isIncrease ? 'increase' : 'decrease', + ], + metric: 'share', + extra: { + currentShare: result.extra?.currentShare, + compareShare: result.extra?.compareShare, + shareShiftPp: result.extra?.shareShiftPp, + isNew: result.extra?.isNew, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/index.ts b/packages/db/src/services/insights/modules/index.ts new file mode 100644 index 00000000..72b45147 --- /dev/null +++ b/packages/db/src/services/insights/modules/index.ts @@ -0,0 +1,5 @@ +export { referrersModule } from './referrers.module'; +export { entryPagesModule } from './entry-pages.module'; +export { pageTrendsModule } from './page-trends.module'; +export { geoModule } from './geo.module'; +export { devicesModule } from './devices.module'; diff --git a/packages/db/src/services/insights/modules/page-trends.module.ts b/packages/db/src/services/insights/modules/page-trends.module.ts new file mode 100644 index 00000000..a9fde665 --- /dev/null +++ b/packages/db/src/services/insights/modules/page-trends.module.ts @@ -0,0 +1,181 @@ +import { TABLE_NAMES } from '../../../clickhouse/client'; +import { clix } from '../../../clickhouse/query-builder'; +import { normalizePath } from '../normalize'; +import type { ComputeResult, InsightModule, RenderedCard } from '../types'; +import { + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, +} from '../utils'; + +export const pageTrendsModule: InsightModule = { + key: 'page-trends', + cadence: ['daily'], + windows: ['yesterday', 'rolling_7d', 'rolling_30d'], + thresholds: { minTotal: 100, minAbsDelta: 30, minPct: 0.2, maxDims: 100 }, + + async enumerateDimensions(ctx) { + // Query top pages from BOTH current and baseline windows + const [currentResults, baselineResults] = await Promise.all([ + clix(ctx.db) + .select<{ path: string; cnt: number }>(['path', 'count(*) as cnt']) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['path']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 100) + .execute(), + clix(ctx.db) + .select<{ path: string; cnt: number }>(['path', 'count(*) as cnt']) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['path']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 100) + .execute(), + ]); + + // Merge both sets + const dims = new Set(); + for (const r of currentResults) { + dims.add(`page:${normalizePath(r.path || '/')}`); + } + for (const r of baselineResults) { + dims.add(`page:${normalizePath(r.path || '/')}`); + } + + return Array.from(dims); + }, + + async computeMany(ctx, dimensionKeys): Promise { + // Single query for ALL current values + const currentResults = await clix(ctx.db) + .select<{ path: string; cnt: number }>(['path', 'count(*) as cnt']) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['path']) + .execute(); + + // Build current lookup map + const currentMap = new Map(); + for (const r of currentResults) { + const key = normalizePath(r.path || '/'); + currentMap.set(key, (currentMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + + // Single query for baseline + let baselineMap: Map; + + if (ctx.window.kind === 'yesterday') { + const baselineResults = await clix(ctx.db) + .select<{ date: string; path: string; cnt: number }>([ + 'toDate(created_at) as date', + 'path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'path']) + .execute(); + + const targetWeekday = getWeekday(ctx.window.start); + baselineMap = computeWeekdayMedians(baselineResults, targetWeekday, (r) => + normalizePath(r.path || '/'), + ); + } else { + const baselineResults = await clix(ctx.db) + .select<{ path: string; cnt: number }>(['path', 'count(*) as cnt']) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['path']) + .execute(); + + baselineMap = new Map(); + for (const r of baselineResults) { + const key = normalizePath(r.path || '/'); + baselineMap.set(key, (baselineMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + } + + // Build results from maps + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('page:')) continue; + const pagePath = dimKey.replace('page:', ''); + + const currentValue = currentMap.get(pagePath) ?? 0; + const compareValue = baselineMap.get(pagePath) ?? 0; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + isNew: compareValue === 0 && currentValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const path = result.dimensionKey.replace('page:', ''); + const pct = ((result.changePct ?? 0) * 100).toFixed(1); + const isIncrease = (result.changePct ?? 0) >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New page getting views: ${path}` + : `Page ${path} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`; + + return { + kind: 'insight_v1', + title, + summary: `${ctx.window.label}. Pageviews ${result.currentValue ?? 0} vs ${result.compareValue ?? 0}.`, + primaryDimension: { type: 'page', key: path, displayName: path }, + tags: [ + 'page-trends', + ctx.window.kind, + isNew ? 'new' : isIncrease ? 'increase' : 'decrease', + ], + metric: 'pageviews', + extra: { + isNew: result.extra?.isNew, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/referrers.module.ts b/packages/db/src/services/insights/modules/referrers.module.ts new file mode 100644 index 00000000..c22989dc --- /dev/null +++ b/packages/db/src/services/insights/modules/referrers.module.ts @@ -0,0 +1,202 @@ +import { TABLE_NAMES } from '../../../clickhouse/client'; +import { clix } from '../../../clickhouse/query-builder'; +import { normalizeReferrer } from '../normalize'; +import type { ComputeResult, InsightModule, RenderedCard } from '../types'; +import { + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, +} from '../utils'; + +export const referrersModule: InsightModule = { + key: 'referrers', + cadence: ['daily'], + windows: ['yesterday', 'rolling_7d', 'rolling_30d'], + thresholds: { minTotal: 100, minAbsDelta: 20, minPct: 0.15, maxDims: 50 }, + + async enumerateDimensions(ctx) { + // Query top referrers from BOTH current and baseline windows + // This allows detecting new sources that didn't exist in baseline + const [currentResults, baselineResults] = await Promise.all([ + clix(ctx.db) + .select<{ referrer_name: string; cnt: number }>([ + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['referrer_name']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 50) + .execute(), + clix(ctx.db) + .select<{ referrer_name: string; cnt: number }>([ + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['referrer_name']) + .orderBy('cnt', 'DESC') + .limit(this.thresholds?.maxDims ?? 50) + .execute(), + ]); + + // Merge both sets to catch new/emerging sources + const dims = new Set(); + for (const r of currentResults) { + dims.add(`referrer:${normalizeReferrer(r.referrer_name || 'direct')}`); + } + for (const r of baselineResults) { + dims.add(`referrer:${normalizeReferrer(r.referrer_name || 'direct')}`); + } + + return Array.from(dims); + }, + + async computeMany(ctx, dimensionKeys): Promise { + // Single query for ALL current values (batched) + const currentResults = await clix(ctx.db) + .select<{ referrer_name: string; cnt: number }>([ + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['referrer_name']) + .execute(); + + // Build current lookup map + const currentMap = new Map(); + for (const r of currentResults) { + const key = normalizeReferrer(r.referrer_name || 'direct'); + currentMap.set(key, (currentMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + + // Single query for baseline (with date breakdown for weekday median if needed) + let baselineMap: Map; + + if (ctx.window.kind === 'yesterday') { + // Need daily breakdown for weekday median calculation + const baselineResults = await clix(ctx.db) + .select<{ date: string; referrer_name: string; cnt: number }>([ + 'toDate(created_at) as date', + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'referrer_name']) + .execute(); + + const targetWeekday = getWeekday(ctx.window.start); + baselineMap = computeWeekdayMedians(baselineResults, targetWeekday, (r) => + normalizeReferrer(r.referrer_name || 'direct'), + ); + } else { + // Rolling windows: simple aggregate + const baselineResults = await clix(ctx.db) + .select<{ referrer_name: string; cnt: number }>([ + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['referrer_name']) + .execute(); + + baselineMap = new Map(); + for (const r of baselineResults) { + const key = normalizeReferrer(r.referrer_name || 'direct'); + baselineMap.set(key, (baselineMap.get(key) ?? 0) + Number(r.cnt ?? 0)); + } + } + + // Build results from maps (in memory, no more queries!) + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('referrer:')) continue; + const referrerName = dimKey.replace('referrer:', ''); + + const currentValue = currentMap.get(referrerName) ?? 0; + const compareValue = baselineMap.get(referrerName) ?? 0; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + isNew: compareValue === 0 && currentValue > 0, + isGone: currentValue === 0 && compareValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const referrer = result.dimensionKey.replace('referrer:', ''); + const pct = ((result.changePct ?? 0) * 100).toFixed(1); + const isIncrease = (result.changePct ?? 0) >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New traffic source: ${referrer}` + : `Traffic from ${referrer} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`; + + return { + kind: 'insight_v1', + title, + summary: `${ctx.window.label}. Sessions ${result.currentValue ?? 0} vs ${result.compareValue ?? 0}.`, + primaryDimension: { + type: 'referrer', + key: referrer, + displayName: referrer, + }, + tags: [ + 'referrers', + ctx.window.kind, + isNew ? 'new' : isIncrease ? 'increase' : 'decrease', + ], + metric: 'sessions', + extra: { + isNew: result.extra?.isNew, + isGone: result.extra?.isGone, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/normalize.ts b/packages/db/src/services/insights/normalize.ts new file mode 100644 index 00000000..65451c21 --- /dev/null +++ b/packages/db/src/services/insights/normalize.ts @@ -0,0 +1,80 @@ +export function normalizeReferrer(name: string): string { + if (!name || name === '') return 'direct'; + + const normalized = name.toLowerCase().trim(); + + // Normalize common referrer variations + const map: Record = { + 'm.instagram.com': 'instagram', + 'l.instagram.com': 'instagram', + 'www.instagram.com': 'instagram', + 'instagram.com': 'instagram', + 't.co': 'twitter', + 'twitter.com': 'twitter', + 'x.com': 'twitter', + 'lm.facebook.com': 'facebook', + 'm.facebook.com': 'facebook', + 'facebook.com': 'facebook', + 'l.facebook.com': 'facebook', + 'linkedin.com': 'linkedin', + 'www.linkedin.com': 'linkedin', + 'youtube.com': 'youtube', + 'www.youtube.com': 'youtube', + 'm.youtube.com': 'youtube', + 'reddit.com': 'reddit', + 'www.reddit.com': 'reddit', + 'tiktok.com': 'tiktok', + 'www.tiktok.com': 'tiktok', + }; + + // Check exact match first + if (map[normalized]) { + return map[normalized]; + } + + // Check if it contains any of the mapped domains + for (const [key, value] of Object.entries(map)) { + if (normalized.includes(key)) { + return value; + } + } + + // Extract domain from URL if present + try { + const url = normalized.startsWith('http') + ? normalized + : `https://${normalized}`; + const hostname = new URL(url).hostname; + // Remove www. prefix + return hostname.replace(/^www\./, ''); + } catch { + // If not a valid URL, return as-is + return normalized || 'direct'; + } +} + +export function normalizePath(path: string): string { + if (!path || path === '') return '/'; + + try { + // If it's a full URL, extract pathname + const url = path.startsWith('http') + ? new URL(path) + : new URL(path, 'http://x'); + const pathname = url.pathname; + // Normalize trailing slash (remove unless it's root) + return pathname === '/' ? '/' : pathname.replace(/\/$/, ''); + } catch { + // If not a valid URL, treat as path + return path === '/' ? '/' : path.replace(/\/$/, '') || '/'; + } +} + +export function normalizeUtmCombo(source: string, medium: string): string { + const s = (source || '').toLowerCase().trim(); + const m = (medium || '').toLowerCase().trim(); + if (!s && !m) return 'none'; + if (!s) return `utm:${m}`; + if (!m) return `utm:${s}`; + return `utm:${s}/${m}`; +} diff --git a/packages/db/src/services/insights/scoring.ts b/packages/db/src/services/insights/scoring.ts new file mode 100644 index 00000000..a373bc41 --- /dev/null +++ b/packages/db/src/services/insights/scoring.ts @@ -0,0 +1,18 @@ +import type { ComputeResult } from './types'; + +export function defaultImpactScore(r: ComputeResult): number { + const vol = (r.currentValue ?? 0) + (r.compareValue ?? 0); + const pct = Math.abs(r.changePct ?? 0); + // stable-ish: bigger change + bigger volume => higher impact + return Math.log1p(vol) * (pct * 100); +} + +export function severityBand( + changePct?: number | null, +): 'low' | 'moderate' | 'severe' | null { + const p = Math.abs(changePct ?? 0); + if (p < 0.05) return null; + if (p < 0.15) return 'low'; + if (p < 0.3) return 'moderate'; + return 'severe'; +} diff --git a/packages/db/src/services/insights/store.ts b/packages/db/src/services/insights/store.ts new file mode 100644 index 00000000..95c08ec3 --- /dev/null +++ b/packages/db/src/services/insights/store.ts @@ -0,0 +1,276 @@ +import { Prisma, db } from '../../prisma-client'; +import type { + Cadence, + InsightStore, + PersistedInsight, + RenderedCard, + WindowKind, + WindowRange, +} from './types'; + +export const insightStore: InsightStore = { + async listProjectIdsForCadence(cadence: Cadence): Promise { + const projects = await db.project.findMany({ + where: { + deleteAt: null, + organization: { + subscriptionStatus: 'active', + }, + }, + select: { id: true }, + }); + return projects.map((p) => p.id); + }, + + async getActiveInsightByIdentity({ + projectId, + moduleKey, + dimensionKey, + windowKind, + }): Promise { + const insight = await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind, + state: 'active', + }, + }); + + if (!insight) return null; + + return { + id: insight.id, + projectId: insight.projectId, + moduleKey: insight.moduleKey, + dimensionKey: insight.dimensionKey, + windowKind: insight.windowKind as WindowKind, + state: insight.state as 'active' | 'suppressed' | 'closed', + version: insight.version, + impactScore: insight.impactScore, + lastSeenAt: insight.lastSeenAt, + lastUpdatedAt: insight.lastUpdatedAt, + direction: insight.direction, + changePct: insight.changePct, + severityBand: insight.severityBand, + }; + }, + + async upsertInsight({ + projectId, + moduleKey, + dimensionKey, + window, + card, + metrics, + now, + decision, + prev, + }): Promise { + const payloadData = (card.payload ?? card) as Prisma.InputJsonValue; + + const baseData = { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: prev?.state === 'closed' ? 'active' : (prev?.state ?? 'active'), + title: card.title, + summary: card.summary ?? null, + payload: payloadData as Prisma.InputJsonValue, + currentValue: metrics.currentValue ?? null, + compareValue: metrics.compareValue ?? null, + changePct: metrics.changePct ?? null, + direction: metrics.direction ?? null, + impactScore: metrics.impactScore, + severityBand: metrics.severityBand ?? null, + version: prev ? (decision.material ? prev.version + 1 : prev.version) : 1, + windowStart: window.start, + windowEnd: window.end, + lastSeenAt: now, + lastUpdatedAt: now, + }; + + // Try to find existing insight first + const existing = prev + ? await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: prev.state, + }, + }) + : null; + + let insight: any; + if (existing) { + // Update existing + insight = await db.projectInsight.update({ + where: { id: existing.id }, + data: { + ...baseData, + threadId: existing.threadId, // Preserve threadId + }, + }); + } else { + // Create new - need to check if there's a closed/suppressed one to reopen + const closed = await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: { in: ['closed', 'suppressed'] }, + }, + orderBy: { lastUpdatedAt: 'desc' }, + }); + + if (closed) { + // Reopen and update + insight = await db.projectInsight.update({ + where: { id: closed.id }, + data: { + ...baseData, + state: 'active', + threadId: closed.threadId, // Preserve threadId + }, + }); + } else { + // Create new + insight = await db.projectInsight.create({ + data: { + ...baseData, + firstDetectedAt: now, + }, + }); + } + } + + return { + id: insight.id, + projectId: insight.projectId, + moduleKey: insight.moduleKey, + dimensionKey: insight.dimensionKey, + windowKind: insight.windowKind as WindowKind, + state: insight.state as 'active' | 'suppressed' | 'closed', + version: insight.version, + impactScore: insight.impactScore, + lastSeenAt: insight.lastSeenAt, + lastUpdatedAt: insight.lastUpdatedAt, + direction: insight.direction, + changePct: insight.changePct, + severityBand: insight.severityBand, + }; + }, + + async insertEvent({ + projectId, + insightId, + moduleKey, + dimensionKey, + windowKind, + eventKind, + changeFrom, + changeTo, + now, + }): Promise { + await db.insightEvent.create({ + data: { + insightId, + eventKind, + changeFrom: changeFrom + ? (changeFrom as Prisma.InputJsonValue) + : Prisma.DbNull, + changeTo: changeTo + ? (changeTo as Prisma.InputJsonValue) + : Prisma.DbNull, + createdAt: now, + }, + }); + }, + + async closeMissingActiveInsights({ + projectId, + moduleKey, + windowKind, + seenDimensionKeys, + now, + staleDays, + }): Promise { + const staleDate = new Date(now); + staleDate.setDate(staleDate.getDate() - staleDays); + + const result = await db.projectInsight.updateMany({ + where: { + projectId, + moduleKey, + windowKind, + state: 'active', + lastSeenAt: { lt: staleDate }, + dimensionKey: { notIn: seenDimensionKeys }, + }, + data: { + state: 'closed', + lastUpdatedAt: now, + }, + }); + + return result.count; + }, + + async applySuppression({ + projectId, + moduleKey, + windowKind, + keepTopN, + now, + }): Promise<{ suppressed: number; unsuppressed: number }> { + // Get all active insights for this module/window, ordered by impactScore desc + const insights = await db.projectInsight.findMany({ + where: { + projectId, + moduleKey, + windowKind, + state: { in: ['active', 'suppressed'] }, + }, + orderBy: { impactScore: 'desc' }, + }); + + if (insights.length === 0) { + return { suppressed: 0, unsuppressed: 0 }; + } + + const topN = insights.slice(0, keepTopN); + const belowN = insights.slice(keepTopN); + + // Suppress those below top N + let suppressed = 0; + let unsuppressed = 0; + + for (const insight of belowN) { + if (insight.state === 'active') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'suppressed', lastUpdatedAt: now }, + }); + suppressed++; + } + } + + // Unsuppress those in top N + for (const insight of topN) { + if (insight.state === 'suppressed') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'active', lastUpdatedAt: now }, + }); + unsuppressed++; + } + } + + return { suppressed, unsuppressed }; + }, +}; diff --git a/packages/db/src/services/insights/types.ts b/packages/db/src/services/insights/types.ts new file mode 100644 index 00000000..19215457 --- /dev/null +++ b/packages/db/src/services/insights/types.ts @@ -0,0 +1,200 @@ +export type Cadence = 'hourly' | 'daily' | 'weekly'; + +export type WindowKind = 'yesterday' | 'rolling_7d' | 'rolling_30d'; + +export interface WindowRange { + kind: WindowKind; + start: Date; // inclusive + end: Date; // inclusive (or exclusive, but be consistent) + baselineStart: Date; + baselineEnd: Date; + label: string; // e.g. "Yesterday" / "Last 7 days" +} + +export interface ComputeContext { + projectId: string; + window: WindowRange; + db: any; // your DB client + now: Date; + logger: Pick; +} + +export interface ComputeResult { + ok: boolean; + dimensionKey: string; // e.g. "referrer:instagram" / "page:/pricing" + currentValue?: number; + compareValue?: number; + changePct?: number; // -0.15 = -15% + direction?: 'up' | 'down' | 'flat'; + extra?: Record; // share delta pp, rank, sparkline, etc. +} + +/** + * Render should be deterministic and safe to call multiple times. + * Raw computed values (currentValue, compareValue, changePct, direction) + * are stored in top-level DB fields. The payload only contains display + * metadata and module-specific extra data for frontend flexibility. + */ +export interface RenderedCard { + kind?: 'insight_v1'; + title: string; + summary?: string; + tags?: string[]; + primaryDimension?: { type: string; key: string; displayName?: string }; + + /** + * What metric this insight measures - frontend uses this to format values. + * 'sessions' | 'pageviews' for absolute counts + * 'share' for percentage-based (geo, devices) + */ + metric?: 'sessions' | 'pageviews' | 'share'; + + /** + * Module-specific extra data (e.g., share values for geo/devices). + * Frontend can use this based on moduleKey. + */ + extra?: Record; +} + +/** Optional per-module thresholds (the engine can still apply global defaults) */ +export interface ModuleThresholds { + minTotal?: number; // min current+baseline + minAbsDelta?: number; // min abs(current-compare) + minPct?: number; // min abs(changePct) + maxDims?: number; // cap enumerateDimensions +} + +export interface InsightModule { + key: string; + cadence: Cadence[]; + windows: WindowKind[]; + thresholds?: ModuleThresholds; + enumerateDimensions?(ctx: ComputeContext): Promise; + /** Preferred path: batch compute many dimensions in one go. */ + computeMany( + ctx: ComputeContext, + dimensionKeys: string[], + ): Promise; + /** Must not do DB reads; just format output. */ + render(result: ComputeResult, ctx: ComputeContext): RenderedCard; + /** Score decides what to show (top-N). */ + score?(result: ComputeResult, ctx: ComputeContext): number; + /** Optional: compute "drivers" for AI explain step */ + drivers?( + result: ComputeResult, + ctx: ComputeContext, + ): Promise>; +} + +/** Insight row shape returned from persistence (minimal fields engine needs). */ +export interface PersistedInsight { + id: string; + projectId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + state: 'active' | 'suppressed' | 'closed'; + version: number; + impactScore: number; + lastSeenAt: Date; + lastUpdatedAt: Date; + direction?: string | null; + changePct?: number | null; + severityBand?: string | null; +} + +/** Material change decision used for events/notifications. */ +export type MaterialReason = + | 'created' + | 'direction_flip' + | 'severity_change' + | 'cross_deadband' + | 'reopened' + | 'none'; + +export interface MaterialDecision { + material: boolean; + reason: MaterialReason; + newSeverityBand?: 'low' | 'moderate' | 'severe' | null; +} + +/** + * Persistence interface: implement with Postgres. + * Keep engine independent of query builder choice. + */ +export interface InsightStore { + listProjectIdsForCadence(cadence: Cadence): Promise; + getActiveInsightByIdentity(args: { + projectId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + }): Promise; + upsertInsight(args: { + projectId: string; + moduleKey: string; + dimensionKey: string; + window: WindowRange; + card: RenderedCard; + metrics: { + currentValue?: number; + compareValue?: number; + changePct?: number; + direction?: 'up' | 'down' | 'flat'; + impactScore: number; + severityBand?: string | null; + }; + now: Date; + decision: MaterialDecision; + prev: PersistedInsight | null; + }): Promise; + insertEvent(args: { + projectId: string; + insightId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + eventKind: + | 'created' + | 'updated' + | 'severity_up' + | 'severity_down' + | 'direction_flip' + | 'closed' + | 'reopened' + | 'suppressed' + | 'unsuppressed'; + changeFrom?: Record | null; + changeTo?: Record | null; + now: Date; + }): Promise; + /** Mark insights as not seen this run if you prefer lifecycle via closeMissing() */ + closeMissingActiveInsights(args: { + projectId: string; + moduleKey: string; + windowKind: WindowKind; + seenDimensionKeys: string[]; + now: Date; + staleDays: number; // close if not seen for X days + }): Promise; // count closed + /** Enforce top-N display by suppressing below-threshold insights. */ + applySuppression(args: { + projectId: string; + moduleKey: string; + windowKind: WindowKind; + keepTopN: number; + now: Date; + }): Promise<{ suppressed: number; unsuppressed: number }>; +} + +export interface ExplainQueue { + enqueueExplain(job: { + insightId: string; + projectId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + evidence: Record; + evidenceHash: string; + }): Promise; +} diff --git a/packages/db/src/services/insights/utils.ts b/packages/db/src/services/insights/utils.ts new file mode 100644 index 00000000..35807ca4 --- /dev/null +++ b/packages/db/src/services/insights/utils.ts @@ -0,0 +1,111 @@ +/** + * Shared utilities for insight modules + */ + +/** + * Get UTC weekday (0 = Sunday, 6 = Saturday) + */ +export function getWeekday(date: Date): number { + return date.getUTCDay(); +} + +/** + * Compute median of a sorted array of numbers + */ +export function computeMedian(sortedValues: number[]): number { + if (sortedValues.length === 0) return 0; + const mid = Math.floor(sortedValues.length / 2); + return sortedValues.length % 2 === 0 + ? ((sortedValues[mid - 1] ?? 0) + (sortedValues[mid] ?? 0)) / 2 + : (sortedValues[mid] ?? 0); +} + +/** + * Compute weekday medians from daily breakdown data. + * Groups by dimension, filters to matching weekday, computes median per dimension. + * + * @param data - Array of { date, dimension, cnt } rows + * @param targetWeekday - Weekday to filter to (0-6) + * @param getDimension - Function to extract normalized dimension from row + * @returns Map of dimension -> median value + */ +export function computeWeekdayMedians< + T extends { date: string; cnt: number | string }, +>( + data: T[], + targetWeekday: number, + getDimension: (row: T) => string, +): Map { + // Group by dimension, filtered to target weekday + const byDimension = new Map(); + + for (const row of data) { + const rowWeekday = getWeekday(new Date(row.date)); + if (rowWeekday !== targetWeekday) continue; + + const dim = getDimension(row); + const values = byDimension.get(dim) ?? []; + values.push(Number(row.cnt ?? 0)); + byDimension.set(dim, values); + } + + // Compute median per dimension + const result = new Map(); + for (const [dim, values] of byDimension) { + values.sort((a, b) => a - b); + result.set(dim, computeMedian(values)); + } + + return result; +} + +/** + * Compute change percentage between current and compare values + */ +export function computeChangePct( + currentValue: number, + compareValue: number, +): number { + return compareValue > 0 + ? (currentValue - compareValue) / compareValue + : currentValue > 0 + ? 1 + : 0; +} + +/** + * Determine direction based on change percentage + */ +export function computeDirection( + changePct: number, + threshold = 0.05, +): 'up' | 'down' | 'flat' { + return changePct > threshold + ? 'up' + : changePct < -threshold + ? 'down' + : 'flat'; +} + +/** + * Merge dimension sets from current and baseline to detect new/gone dimensions + */ +export function mergeDimensionSets( + currentDims: Set, + baselineDims: Set, +): string[] { + const merged = new Set(); + for (const dim of currentDims) merged.add(dim); + for (const dim of baselineDims) merged.add(dim); + return Array.from(merged); +} + +/** + * Get end of day timestamp (23:59:59.999) for a given date. + * Used to ensure BETWEEN queries include the full day. + */ +export function getEndOfDay(date: Date): Date { + const end = new Date(date); + end.setUTCHours(23, 59, 59, 999); + return end; +} diff --git a/packages/db/src/services/insights/windows.ts b/packages/db/src/services/insights/windows.ts new file mode 100644 index 00000000..d2d8870b --- /dev/null +++ b/packages/db/src/services/insights/windows.ts @@ -0,0 +1,59 @@ +import type { WindowKind, WindowRange } from './types'; + +function atUtcMidnight(d: Date) { + const x = new Date(d); + x.setUTCHours(0, 0, 0, 0); + return x; +} + +function addDays(d: Date, days: number) { + const x = new Date(d); + x.setUTCDate(x.getUTCDate() + days); + return x; +} + +/** + * Convention: end is inclusive (end of day). If you prefer exclusive, adapt consistently. + */ +export function resolveWindow(kind: WindowKind, now: Date): WindowRange { + const today0 = atUtcMidnight(now); + const yesterday0 = addDays(today0, -1); + if (kind === 'yesterday') { + const start = yesterday0; + const end = yesterday0; + // Baseline: median of last 4 same weekdays -> engine/module implements the median. + // Here we just define the candidate range; module queries last 28 days and filters weekday. + const baselineStart = addDays(yesterday0, -28); + const baselineEnd = addDays(yesterday0, -1); + return { kind, start, end, baselineStart, baselineEnd, label: 'Yesterday' }; + } + if (kind === 'rolling_7d') { + const end = yesterday0; + const start = addDays(end, -6); // 7 days inclusive + const baselineEnd = addDays(start, -1); + const baselineStart = addDays(baselineEnd, -6); + return { + kind, + start, + end, + baselineStart, + baselineEnd, + label: 'Last 7 days', + }; + } + // rolling_30d + { + const end = yesterday0; + const start = addDays(end, -29); + const baselineEnd = addDays(start, -1); + const baselineStart = addDays(baselineEnd, -29); + return { + kind, + start, + end, + baselineStart, + baselineEnd, + label: 'Last 30 days', + }; + } +} diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts index b75129c6..fdab0db0 100644 --- a/packages/db/src/session-consistency.ts +++ b/packages/db/src/session-consistency.ts @@ -180,11 +180,11 @@ export function sessionConsistency() { // For write operations with session: cache WAL LSN after write if (isWriteOperation(operation)) { - logger.info('Prisma operation', { - operation, - args, - model, - }); + // logger.info('Prisma operation', { + // operation, + // args, + // model, + // }); const result = await query(args); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index d21bf4e7..11526826 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -111,13 +111,18 @@ export type CronQueuePayloadProject = { type: 'deleteProjects'; payload: undefined; }; +export type CronQueuePayloadInsightsDaily = { + type: 'insightsDaily'; + payload: undefined; +}; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents | CronQueuePayloadFlushSessions | CronQueuePayloadFlushProfiles | CronQueuePayloadPing - | CronQueuePayloadProject; + | CronQueuePayloadProject + | CronQueuePayloadInsightsDaily; export type MiscQueuePayloadTrialEndingSoon = { type: 'trialEndingSoon'; @@ -235,6 +240,21 @@ export const importQueue = new Queue( }, ); +export type InsightsQueuePayloadProject = { + type: 'insightsProject'; + payload: { projectId: string; date: string }; +}; + +export const insightsQueue = new Queue( + getQueueName('insights'), + { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 100, + }, + }, +); + export function addTrialEndingSoonJob(organizationId: string, delay: number) { return miscQueue.add( 'misc', diff --git a/packages/trpc/src/root.ts b/packages/trpc/src/root.ts index d852c9c1..517db51b 100644 --- a/packages/trpc/src/root.ts +++ b/packages/trpc/src/root.ts @@ -5,6 +5,7 @@ import { clientRouter } from './routers/client'; import { dashboardRouter } from './routers/dashboard'; import { eventRouter } from './routers/event'; import { importRouter } from './routers/import'; +import { insightRouter } from './routers/insight'; import { integrationRouter } from './routers/integration'; import { notificationRouter } from './routers/notification'; import { onboardingRouter } from './routers/onboarding'; @@ -47,6 +48,7 @@ export const appRouter = createTRPCRouter({ overview: overviewRouter, realtime: realtimeRouter, chat: chatRouter, + insight: insightRouter, }); // export type definition of API diff --git a/packages/trpc/src/routers/insight.ts b/packages/trpc/src/routers/insight.ts new file mode 100644 index 00000000..6928d6c8 --- /dev/null +++ b/packages/trpc/src/routers/insight.ts @@ -0,0 +1,134 @@ +import { db } from '@openpanel/db'; +import { z } from 'zod'; +import { getProjectAccess } from '../access'; +import { TRPCAccessError } from '../errors'; +import { createTRPCRouter, protectedProcedure } from '../trpc'; + +export const insightRouter = createTRPCRouter({ + list: protectedProcedure + .input( + z.object({ + projectId: z.string(), + limit: z.number().min(1).max(100).optional().default(50), + }), + ) + .query(async ({ input: { projectId, limit }, ctx }) => { + const access = await getProjectAccess({ + userId: ctx.session.userId, + projectId, + }); + + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } + + // Fetch more insights than needed to account for deduplication + const allInsights = await db.projectInsight.findMany({ + where: { + projectId, + state: 'active', + moduleKey: { + notIn: ['page-trends', 'entry-pages'], + }, + }, + orderBy: { + impactScore: 'desc', + }, + take: limit * 3, // Fetch 3x to account for deduplication + select: { + id: true, + title: true, + summary: true, + payload: true, + currentValue: true, + compareValue: true, + changePct: true, + direction: true, + moduleKey: true, + dimensionKey: true, + windowKind: true, + severityBand: true, + firstDetectedAt: true, + impactScore: true, + }, + }); + + // WindowKind priority: yesterday (1) > rolling_7d (2) > rolling_30d (3) + const windowKindPriority: Record = { + yesterday: 1, + rolling_7d: 2, + rolling_30d: 3, + }; + + // Group by moduleKey + dimensionKey, keep only highest priority windowKind + const deduplicated = new Map(); + for (const insight of allInsights) { + const key = `${insight.moduleKey}:${insight.dimensionKey}`; + const existing = deduplicated.get(key); + const currentPriority = windowKindPriority[insight.windowKind] ?? 999; + const existingPriority = existing + ? (windowKindPriority[existing.windowKind] ?? 999) + : 999; + + // Keep if no existing, or if current has higher priority (lower number) + if (!existing || currentPriority < existingPriority) { + deduplicated.set(key, insight); + } + } + + // Convert back to array, sort by impactScore, and limit + const insights = Array.from(deduplicated.values()) + .sort((a, b) => (b.impactScore ?? 0) - (a.impactScore ?? 0)) + .slice(0, limit) + .map(({ impactScore, ...rest }) => rest); // Remove impactScore from response + + return insights; + }), + + listAll: protectedProcedure + .input( + z.object({ + projectId: z.string(), + limit: z.number().min(1).max(500).optional().default(200), + }), + ) + .query(async ({ input: { projectId, limit }, ctx }) => { + const access = await getProjectAccess({ + userId: ctx.session.userId, + projectId, + }); + + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } + + const insights = await db.projectInsight.findMany({ + where: { + projectId, + state: 'active', + }, + orderBy: { + impactScore: 'desc', + }, + take: limit, + select: { + id: true, + title: true, + summary: true, + payload: true, + currentValue: true, + compareValue: true, + changePct: true, + direction: true, + moduleKey: true, + dimensionKey: true, + windowKind: true, + severityBand: true, + firstDetectedAt: true, + impactScore: true, + }, + }); + + return insights; + }), +});