fix: funnel
This commit is contained in:
@@ -1,9 +1,5 @@
|
||||
import { ifNaN } from '@openpanel/common';
|
||||
import type {
|
||||
IChartEvent,
|
||||
IChartEventItem,
|
||||
IReportInput,
|
||||
} from '@openpanel/validation';
|
||||
import type { IChartEvent, IReportInput } from '@openpanel/validation';
|
||||
import { last, reverse, uniq } from 'ramda';
|
||||
import sqlstring from 'sqlstring';
|
||||
import { ch } from '../clickhouse/client';
|
||||
@@ -19,10 +15,14 @@ import { onlyReportEvents } from './reports.service';
|
||||
export class FunnelService {
|
||||
constructor(private client: typeof ch) {}
|
||||
|
||||
getFunnelGroup(group?: string): [string, string] {
|
||||
return group === 'profile_id'
|
||||
? [`COALESCE(nullIf(s.pid, ''), profile_id)`, 'profile_id']
|
||||
: ['session_id', 'session_id'];
|
||||
/**
|
||||
* Returns the grouping strategy for the funnel.
|
||||
* Note: windowFunnel is ALWAYS computed per session_id first to handle
|
||||
* identity changes mid-session (anonymous → logged-in).
|
||||
* The returned group is used for the final aggregation step.
|
||||
*/
|
||||
getFunnelGroup(group?: string): 'profile_id' | 'session_id' {
|
||||
return group === 'profile_id' ? 'profile_id' : 'session_id';
|
||||
}
|
||||
|
||||
getFunnelConditions(events: IChartEvent[] = []): string[] {
|
||||
@@ -34,13 +34,18 @@ export class FunnelService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the session-level funnel CTE.
|
||||
* IMPORTANT: windowFunnel is ALWAYS computed per session_id first.
|
||||
* This ensures identity changes mid-session (anonymous → logged-in) don't break the funnel.
|
||||
* The profile_id is extracted from the last event in the session using argMax.
|
||||
*/
|
||||
buildFunnelCte({
|
||||
projectId,
|
||||
startDate,
|
||||
endDate,
|
||||
eventSeries,
|
||||
funnelWindowMilliseconds,
|
||||
group,
|
||||
timezone,
|
||||
additionalSelects = [],
|
||||
additionalGroupBy = [],
|
||||
@@ -50,7 +55,6 @@ export class FunnelService {
|
||||
endDate: string;
|
||||
eventSeries: IChartEvent[];
|
||||
funnelWindowMilliseconds: number;
|
||||
group: [string, string];
|
||||
timezone: string;
|
||||
additionalSelects?: string[];
|
||||
additionalGroupBy?: string[];
|
||||
@@ -59,9 +63,10 @@ export class FunnelService {
|
||||
|
||||
return clix(this.client, timezone)
|
||||
.select([
|
||||
`${group[0]} AS ${group[1]}`,
|
||||
...additionalSelects,
|
||||
'session_id',
|
||||
`windowFunnel(${funnelWindowMilliseconds}, 'strict_increase')(toUInt64(toUnixTimestamp64Milli(created_at)), ${funnels.join(', ')}) AS level`,
|
||||
'argMax(profile_id, created_at) AS profile_id',
|
||||
...additionalSelects,
|
||||
])
|
||||
.from(TABLE_NAMES.events, false)
|
||||
.where('project_id', '=', projectId)
|
||||
@@ -74,7 +79,7 @@ export class FunnelService {
|
||||
'IN',
|
||||
eventSeries.map((e) => e.name),
|
||||
)
|
||||
.groupBy([group[1], ...additionalGroupBy]);
|
||||
.groupBy(['session_id', ...additionalGroupBy]);
|
||||
}
|
||||
|
||||
buildSessionsCte({
|
||||
@@ -213,7 +218,7 @@ export class FunnelService {
|
||||
b.name.startsWith('profile.'),
|
||||
);
|
||||
|
||||
// Create the funnel CTE
|
||||
// Create the funnel CTE (session-level)
|
||||
const breakdownSelects = breakdowns.map(
|
||||
(b, index) => `${getSelectPropertyKey(b.name)} as b_${index}`,
|
||||
);
|
||||
@@ -225,7 +230,6 @@ export class FunnelService {
|
||||
endDate,
|
||||
eventSeries,
|
||||
funnelWindowMilliseconds,
|
||||
group,
|
||||
timezone,
|
||||
additionalSelects: breakdownSelects,
|
||||
additionalGroupBy: breakdownGroupBy,
|
||||
@@ -239,27 +243,27 @@ export class FunnelService {
|
||||
);
|
||||
}
|
||||
|
||||
// Create the sessions CTE if needed
|
||||
const sessionsCte =
|
||||
group[0] !== 'session_id'
|
||||
? this.buildSessionsCte({
|
||||
projectId,
|
||||
startDate,
|
||||
endDate,
|
||||
timezone,
|
||||
})
|
||||
: null;
|
||||
|
||||
// Base funnel query with CTEs
|
||||
const funnelQuery = clix(this.client, timezone);
|
||||
funnelQuery.with('session_funnel', funnelCte);
|
||||
|
||||
if (sessionsCte) {
|
||||
funnelCte.leftJoin('sessions s', 's.sid = events.session_id');
|
||||
funnelQuery.with('sessions', sessionsCte);
|
||||
if (group === 'profile_id') {
|
||||
// For profile grouping: re-aggregate by profile_id, taking MAX level per profile.
|
||||
// This ensures a user who completed the funnel across multiple sessions
|
||||
// (or with identity change) is counted correctly.
|
||||
const breakdownAggregates =
|
||||
breakdowns.length > 0
|
||||
? `, ${breakdowns.map((_, index) => `any(b_${index}) AS b_${index}`).join(', ')}`
|
||||
: '';
|
||||
funnelQuery.with(
|
||||
'funnel',
|
||||
`SELECT profile_id, max(level) AS level${breakdownAggregates} FROM session_funnel WHERE level != 0 GROUP BY profile_id`,
|
||||
);
|
||||
} else {
|
||||
// For session grouping: use session_funnel directly
|
||||
funnelQuery.with('funnel', 'SELECT * FROM session_funnel');
|
||||
}
|
||||
|
||||
funnelQuery.with('funnel', funnelCte);
|
||||
|
||||
funnelQuery
|
||||
.select<{
|
||||
level: number;
|
||||
|
||||
@@ -992,7 +992,6 @@ export const chartRouter = createTRPCRouter({
|
||||
showDropoffs = false,
|
||||
funnelWindow,
|
||||
funnelGroup,
|
||||
breakdowns = [],
|
||||
} = input;
|
||||
|
||||
const { startDate, endDate } = getChartStartEndDate(input, timezone);
|
||||
@@ -1009,31 +1008,21 @@ export const chartRouter = createTRPCRouter({
|
||||
const funnelWindowSeconds = (funnelWindow || 24) * 3600;
|
||||
const funnelWindowMilliseconds = funnelWindowSeconds * 1000;
|
||||
|
||||
// Use funnel service methods
|
||||
// Get the grouping strategy (profile_id or session_id)
|
||||
const group = funnelService.getFunnelGroup(funnelGroup);
|
||||
|
||||
// Create sessions CTE if needed
|
||||
const sessionsCte =
|
||||
group[0] !== 'session_id'
|
||||
? funnelService.buildSessionsCte({
|
||||
projectId,
|
||||
startDate,
|
||||
endDate,
|
||||
timezone,
|
||||
})
|
||||
: null;
|
||||
|
||||
// Create funnel CTE using funnel service
|
||||
// Note: buildFunnelCte always computes windowFunnel per session_id and extracts
|
||||
// profile_id via argMax to handle identity changes mid-session correctly.
|
||||
const funnelCte = funnelService.buildFunnelCte({
|
||||
projectId,
|
||||
startDate,
|
||||
endDate,
|
||||
eventSeries: eventSeries as IChartEvent[],
|
||||
funnelWindowMilliseconds,
|
||||
group,
|
||||
timezone,
|
||||
additionalSelects: ['profile_id'],
|
||||
additionalGroupBy: ['profile_id'],
|
||||
// No need to add profile_id to additionalSelects/additionalGroupBy
|
||||
// since buildFunnelCte already extracts it via argMax(profile_id, created_at)
|
||||
});
|
||||
|
||||
// Check for profile filters and add profile join if needed
|
||||
@@ -1052,14 +1041,20 @@ export const chartRouter = createTRPCRouter({
|
||||
|
||||
// Build main query
|
||||
const query = clix(ch, timezone);
|
||||
query.with('session_funnel', funnelCte);
|
||||
|
||||
if (sessionsCte) {
|
||||
funnelCte.leftJoin('sessions s', 's.sid = events.session_id');
|
||||
query.with('sessions', sessionsCte);
|
||||
if (group === 'profile_id') {
|
||||
// For profile grouping: re-aggregate by profile_id, taking MAX level per profile.
|
||||
// This ensures a user who completed the funnel with identity change is counted correctly.
|
||||
query.with(
|
||||
'funnel',
|
||||
'SELECT profile_id, max(level) AS level FROM session_funnel WHERE level != 0 GROUP BY profile_id',
|
||||
);
|
||||
} else {
|
||||
// For session grouping: use session_funnel directly
|
||||
query.with('funnel', 'SELECT * FROM session_funnel');
|
||||
}
|
||||
|
||||
query.with('funnel', funnelCte);
|
||||
|
||||
// Get distinct profile IDs
|
||||
query
|
||||
.select(['DISTINCT profile_id'])
|
||||
|
||||
Reference in New Issue
Block a user