From 2c7edec2743804e9fb570a9720e5c21c58224e63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 29 Jan 2026 13:30:59 +0100 Subject: [PATCH] fix: funnel --- packages/db/src/services/funnel.service.ts | 68 ++++++++++++---------- packages/trpc/src/routers/chart.ts | 37 +++++------- 2 files changed, 52 insertions(+), 53 deletions(-) diff --git a/packages/db/src/services/funnel.service.ts b/packages/db/src/services/funnel.service.ts index 74211c73..4d0f79aa 100644 --- a/packages/db/src/services/funnel.service.ts +++ b/packages/db/src/services/funnel.service.ts @@ -1,9 +1,5 @@ import { ifNaN } from '@openpanel/common'; -import type { - IChartEvent, - IChartEventItem, - IReportInput, -} from '@openpanel/validation'; +import type { IChartEvent, IReportInput } from '@openpanel/validation'; import { last, reverse, uniq } from 'ramda'; import sqlstring from 'sqlstring'; import { ch } from '../clickhouse/client'; @@ -19,10 +15,14 @@ import { onlyReportEvents } from './reports.service'; export class FunnelService { constructor(private client: typeof ch) {} - getFunnelGroup(group?: string): [string, string] { - return group === 'profile_id' - ? [`COALESCE(nullIf(s.pid, ''), profile_id)`, 'profile_id'] - : ['session_id', 'session_id']; + /** + * Returns the grouping strategy for the funnel. + * Note: windowFunnel is ALWAYS computed per session_id first to handle + * identity changes mid-session (anonymous → logged-in). + * The returned group is used for the final aggregation step. + */ + getFunnelGroup(group?: string): 'profile_id' | 'session_id' { + return group === 'profile_id' ? 'profile_id' : 'session_id'; } getFunnelConditions(events: IChartEvent[] = []): string[] { @@ -34,13 +34,18 @@ export class FunnelService { }); } + /** + * Builds the session-level funnel CTE. + * IMPORTANT: windowFunnel is ALWAYS computed per session_id first. + * This ensures identity changes mid-session (anonymous → logged-in) don't break the funnel. + * The profile_id is extracted from the last event in the session using argMax. + */ buildFunnelCte({ projectId, startDate, endDate, eventSeries, funnelWindowMilliseconds, - group, timezone, additionalSelects = [], additionalGroupBy = [], @@ -50,7 +55,6 @@ export class FunnelService { endDate: string; eventSeries: IChartEvent[]; funnelWindowMilliseconds: number; - group: [string, string]; timezone: string; additionalSelects?: string[]; additionalGroupBy?: string[]; @@ -59,9 +63,10 @@ export class FunnelService { return clix(this.client, timezone) .select([ - `${group[0]} AS ${group[1]}`, - ...additionalSelects, + 'session_id', `windowFunnel(${funnelWindowMilliseconds}, 'strict_increase')(toUInt64(toUnixTimestamp64Milli(created_at)), ${funnels.join(', ')}) AS level`, + 'argMax(profile_id, created_at) AS profile_id', + ...additionalSelects, ]) .from(TABLE_NAMES.events, false) .where('project_id', '=', projectId) @@ -74,7 +79,7 @@ export class FunnelService { 'IN', eventSeries.map((e) => e.name), ) - .groupBy([group[1], ...additionalGroupBy]); + .groupBy(['session_id', ...additionalGroupBy]); } buildSessionsCte({ @@ -213,7 +218,7 @@ export class FunnelService { b.name.startsWith('profile.'), ); - // Create the funnel CTE + // Create the funnel CTE (session-level) const breakdownSelects = breakdowns.map( (b, index) => `${getSelectPropertyKey(b.name)} as b_${index}`, ); @@ -225,7 +230,6 @@ export class FunnelService { endDate, eventSeries, funnelWindowMilliseconds, - group, timezone, additionalSelects: breakdownSelects, additionalGroupBy: breakdownGroupBy, @@ -239,27 +243,27 @@ export class FunnelService { ); } - // Create the sessions CTE if needed - const sessionsCte = - group[0] !== 'session_id' - ? this.buildSessionsCte({ - projectId, - startDate, - endDate, - timezone, - }) - : null; - // Base funnel query with CTEs const funnelQuery = clix(this.client, timezone); + funnelQuery.with('session_funnel', funnelCte); - if (sessionsCte) { - funnelCte.leftJoin('sessions s', 's.sid = events.session_id'); - funnelQuery.with('sessions', sessionsCte); + if (group === 'profile_id') { + // For profile grouping: re-aggregate by profile_id, taking MAX level per profile. + // This ensures a user who completed the funnel across multiple sessions + // (or with identity change) is counted correctly. + const breakdownAggregates = + breakdowns.length > 0 + ? `, ${breakdowns.map((_, index) => `any(b_${index}) AS b_${index}`).join(', ')}` + : ''; + funnelQuery.with( + 'funnel', + `SELECT profile_id, max(level) AS level${breakdownAggregates} FROM session_funnel WHERE level != 0 GROUP BY profile_id`, + ); + } else { + // For session grouping: use session_funnel directly + funnelQuery.with('funnel', 'SELECT * FROM session_funnel'); } - funnelQuery.with('funnel', funnelCte); - funnelQuery .select<{ level: number; diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index d7636569..aadb2874 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -992,7 +992,6 @@ export const chartRouter = createTRPCRouter({ showDropoffs = false, funnelWindow, funnelGroup, - breakdowns = [], } = input; const { startDate, endDate } = getChartStartEndDate(input, timezone); @@ -1009,31 +1008,21 @@ export const chartRouter = createTRPCRouter({ const funnelWindowSeconds = (funnelWindow || 24) * 3600; const funnelWindowMilliseconds = funnelWindowSeconds * 1000; - // Use funnel service methods + // Get the grouping strategy (profile_id or session_id) const group = funnelService.getFunnelGroup(funnelGroup); - // Create sessions CTE if needed - const sessionsCte = - group[0] !== 'session_id' - ? funnelService.buildSessionsCte({ - projectId, - startDate, - endDate, - timezone, - }) - : null; - // Create funnel CTE using funnel service + // Note: buildFunnelCte always computes windowFunnel per session_id and extracts + // profile_id via argMax to handle identity changes mid-session correctly. const funnelCte = funnelService.buildFunnelCte({ projectId, startDate, endDate, eventSeries: eventSeries as IChartEvent[], funnelWindowMilliseconds, - group, timezone, - additionalSelects: ['profile_id'], - additionalGroupBy: ['profile_id'], + // No need to add profile_id to additionalSelects/additionalGroupBy + // since buildFunnelCte already extracts it via argMax(profile_id, created_at) }); // Check for profile filters and add profile join if needed @@ -1052,14 +1041,20 @@ export const chartRouter = createTRPCRouter({ // Build main query const query = clix(ch, timezone); + query.with('session_funnel', funnelCte); - if (sessionsCte) { - funnelCte.leftJoin('sessions s', 's.sid = events.session_id'); - query.with('sessions', sessionsCte); + if (group === 'profile_id') { + // For profile grouping: re-aggregate by profile_id, taking MAX level per profile. + // This ensures a user who completed the funnel with identity change is counted correctly. + query.with( + 'funnel', + 'SELECT profile_id, max(level) AS level FROM session_funnel WHERE level != 0 GROUP BY profile_id', + ); + } else { + // For session grouping: use session_funnel directly + query.with('funnel', 'SELECT * FROM session_funnel'); } - query.with('funnel', funnelCte); - // Get distinct profile IDs query .select(['DISTINCT profile_id'])