Files
stats/packages/db/src/services/funnel.service.ts

416 lines
13 KiB
TypeScript

import { ifNaN } from '@openpanel/common';
import type { IChartEvent, IReportInput } from '@openpanel/validation';
import { last, reverse } from 'ramda';
import sqlstring from 'sqlstring';
import { ch, TABLE_NAMES } from '../clickhouse/client';
import { clix } from '../clickhouse/query-builder';
import { createSqlBuilder } from '../sql-builder';
import {
getEventFiltersWhereClause,
getSelectPropertyKey,
} from './chart.service';
import { onlyReportEvents } from './reports.service';
/** Display label for null/empty breakdown values (e.g. property not set). */
export const EMPTY_BREAKDOWN_LABEL = 'Not set';
function normalizeBreakdownValue(value: unknown): string {
if (value == null || value === '') {
return EMPTY_BREAKDOWN_LABEL;
}
const s = String(value).trim();
return s === '' ? EMPTY_BREAKDOWN_LABEL : s;
}
export class FunnelService {
constructor(private client: typeof ch) {}
/**
* Returns the grouping strategy for the funnel.
* Determines whether windowFunnel is computed per session_id or profile_id.
*/
getFunnelGroup(group?: string): 'profile_id' | 'session_id' {
return group === 'profile_id' ? 'profile_id' : 'session_id';
}
getFunnelConditions(
events: IChartEvent[] = [],
projectId?: string
): string[] {
return events.map((event) => {
const { sb, getWhere } = createSqlBuilder();
sb.where = getEventFiltersWhereClause(event.filters, projectId);
sb.where.name = `events.name = ${sqlstring.escape(event.name)}`;
return getWhere().replace('WHERE ', '');
});
}
/**
* Builds the funnel CTE.
* - When group === 'session_id': windowFunnel is computed per session_id.
* profile_id is resolved via argMax to handle identity changes mid-session.
* - When group === 'profile_id': windowFunnel is computed directly per profile_id.
* This correctly handles cross-session funnel completions.
*/
buildFunnelCte({
projectId,
startDate,
endDate,
eventSeries,
funnelWindowMilliseconds,
timezone,
additionalSelects = [],
additionalGroupBy = [],
group = 'session_id',
}: {
projectId: string;
startDate: string;
endDate: string;
eventSeries: IChartEvent[];
funnelWindowMilliseconds: number;
timezone: string;
additionalSelects?: string[];
additionalGroupBy?: string[];
group?: 'session_id' | 'profile_id';
}) {
const funnels = this.getFunnelConditions(eventSeries, projectId);
const primaryKey = group === 'profile_id' ? 'profile_id' : 'session_id';
return clix(this.client, timezone)
.select([
primaryKey,
`windowFunnel(${funnelWindowMilliseconds}, 'strict_increase')(toUInt64(toUnixTimestamp64Milli(created_at)), ${funnels.join(', ')}) AS level`,
...(group === 'session_id'
? ['argMax(profile_id, created_at) AS profile_id']
: []),
...additionalSelects,
])
.from(TABLE_NAMES.events, false)
.where('project_id', '=', projectId)
.where('created_at', 'BETWEEN', [
clix.datetime(startDate, 'toDateTime'),
clix.datetime(endDate, 'toDateTime'),
])
.where(
'events.name',
'IN',
eventSeries.map((e) => e.name)
)
.groupBy([primaryKey, ...additionalGroupBy]);
}
buildSessionsCte({
projectId,
startDate,
endDate,
timezone,
}: {
projectId: string;
startDate: string;
endDate: string;
timezone: string;
}) {
return clix(this.client, timezone)
.select(['profile_id as pid', 'id as sid'])
.from(TABLE_NAMES.sessions)
.where('project_id', '=', projectId)
.where('created_at', 'BETWEEN', [
clix.datetime(startDate, 'toDateTime'),
clix.datetime(endDate, 'toDateTime'),
]);
}
private fillFunnel(
funnel: { level: number; count: number }[],
steps: number
) {
const filled = Array.from({ length: steps }, (_, index) => {
const level = index + 1;
const matchingResult = funnel.find((res) => res.level === level);
return {
level,
count: matchingResult ? matchingResult.count : 0,
};
});
// Accumulate counts from top to bottom of the funnel
for (let i = filled.length - 1; i >= 0; i--) {
const step = filled[i];
const prevStep = filled[i + 1];
// If there's a previous step, add the count to the current step
if (step && prevStep) {
step.count += prevStep.count;
}
}
return filled.reverse();
}
toSeries(
funnel: { level: number; count: number; [key: string]: any }[],
breakdowns: { name: string }[] = [],
limit: number | undefined = undefined
) {
if (!breakdowns.length) {
return [
funnel.map((f) => ({
level: f.level,
count: f.count,
id: 'none',
breakdowns: [],
})),
];
}
// Group by breakdown values (normalize empty/null to "Not set")
const series = funnel.reduce(
(acc, f) => {
if (limit && Object.keys(acc).length >= limit) {
return acc;
}
const key = breakdowns
.map((b, index) => normalizeBreakdownValue(f[`b_${index}`]))
.join('|');
if (!acc[key]) {
acc[key] = [];
}
acc[key]!.push({
id: key,
breakdowns: breakdowns.map((b, index) =>
normalizeBreakdownValue(f[`b_${index}`])
),
level: f.level,
count: f.count,
});
return acc;
},
{} as Record<
string,
{
id: string;
breakdowns: string[];
level: number;
count: number;
}[]
>
);
return Object.values(series);
}
getProfileFilters(events: IChartEvent[]) {
return events.flatMap((e) =>
e.filters
?.filter((f) => f.name.startsWith('profile.'))
.map((f) => f.name.replace('profile.', ''))
);
}
async getFunnel({
projectId,
startDate,
endDate,
series,
options,
breakdowns = [],
limit,
timezone = 'UTC',
}: IReportInput & { timezone: string; events?: IChartEvent[] }) {
if (!(startDate && endDate)) {
throw new Error('startDate and endDate are required');
}
const funnelOptions = options?.type === 'funnel' ? options : undefined;
const funnelWindow = funnelOptions?.funnelWindow ?? 24;
const funnelGroup = funnelOptions?.funnelGroup;
const eventSeries = onlyReportEvents(series);
if (eventSeries.length === 0) {
throw new Error('events are required');
}
const funnelWindowSeconds = funnelWindow * 3600;
const funnelWindowMilliseconds = funnelWindowSeconds * 1000;
const group = this.getFunnelGroup(funnelGroup);
const profileFilters = this.getProfileFilters(eventSeries);
const anyFilterOnProfile = profileFilters.length > 0;
const anyBreakdownOnProfile = breakdowns.some((b) =>
b.name.startsWith('profile.')
);
const anyFilterOnGroup = eventSeries.some((e) =>
e.filters?.some((f) => f.name.startsWith('group.'))
);
const anyBreakdownOnGroup = breakdowns.some((b) =>
b.name.startsWith('group.')
);
const needsGroupArrayJoin =
anyFilterOnGroup || anyBreakdownOnGroup || funnelGroup === 'group';
// Create the funnel CTE (session-level)
const breakdownSelects = breakdowns.map(
(b, index) => `${getSelectPropertyKey(b.name, projectId)} as b_${index}`
);
const breakdownGroupBy = breakdowns.map((b, index) => `b_${index}`);
const funnelCte = this.buildFunnelCte({
projectId,
startDate,
endDate,
eventSeries,
funnelWindowMilliseconds,
timezone,
additionalSelects: breakdownSelects,
additionalGroupBy: breakdownGroupBy,
group,
});
if (anyFilterOnProfile || anyBreakdownOnProfile) {
// Collect profile columns needed for filters and breakdowns (same as conversion.service)
const profileFields = new Set<string>(['id']);
for (const f of profileFilters) {
profileFields.add(f.split('.')[0]!);
}
for (const b of breakdowns.filter((x) => x.name.startsWith('profile.'))) {
const fieldName = b.name.replace('profile.', '').split('.')[0];
if (fieldName === 'properties') {
profileFields.add('properties');
} else if (['email', 'first_name', 'last_name'].includes(fieldName!)) {
profileFields.add(fieldName!);
}
}
const profileSelectColumns = Array.from(profileFields).join(', ');
funnelCte.leftJoin(
`(SELECT ${profileSelectColumns} FROM ${TABLE_NAMES.profiles} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}) as profile`,
'profile.id = events.profile_id'
);
}
if (needsGroupArrayJoin) {
funnelCte.rawJoin('ARRAY JOIN groups AS _group_id');
funnelCte.rawJoin('LEFT ANY JOIN _g ON _g.id = _group_id');
}
// Base funnel query with CTEs
const funnelQuery = clix(this.client, timezone);
if (needsGroupArrayJoin) {
funnelQuery.with(
'_g',
`SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}`
);
}
funnelQuery.with('session_funnel', funnelCte);
// windowFunnel is computed per the primary key (profile_id or session_id),
// so we just filter out level=0 rows — no re-aggregation needed.
funnelQuery.with('funnel', 'SELECT * FROM session_funnel WHERE level != 0');
funnelQuery
.select<{
level: number;
count: number;
[key: string]: any;
}>([
'level',
...breakdowns.map((b, index) => `b_${index}`),
'count() as count',
])
.from('funnel')
.groupBy(['level', ...breakdowns.map((b, index) => `b_${index}`)])
.orderBy('level', 'DESC');
const funnelData = await funnelQuery.execute();
const funnelSeries = this.toSeries(funnelData, breakdowns, limit);
return funnelSeries
.map((data) => {
const maxLevel = eventSeries.length;
const filledFunnelRes = this.fillFunnel(
data.map((d) => ({ level: d.level, count: d.count })),
maxLevel
);
const totalSessions = last(filledFunnelRes)?.count ?? 0;
const steps = reverse(filledFunnelRes)
.reduce(
(acc, item, index, list) => {
const prev = list[index - 1] ?? { count: totalSessions };
const next = list[index + 1];
const event = eventSeries[item.level - 1]!;
return [
...acc,
{
event: {
...event,
displayName: event.displayName || event.name,
},
count: item.count,
percent: (item.count / totalSessions) * 100,
dropoffCount: next ? item.count - next.count : null,
dropoffPercent: next
? ((item.count - next.count) / item.count) * 100
: null,
previousCount: prev.count,
nextCount: next?.count ?? null,
},
];
},
[] as {
event: IChartEvent & { displayName: string };
count: number;
percent: number;
dropoffCount: number | null;
dropoffPercent: number | null;
previousCount: number;
nextCount: number | null;
}[]
)
.map((step, index, list) => {
return {
...step,
percent: ifNaN(step.percent, 0),
dropoffPercent: ifNaN(step.dropoffPercent, 0),
isHighestDropoff: (() => {
// Skip if current step has no dropoff
if (!step?.dropoffCount) {
return false;
}
// Get maximum dropoff count, excluding 0s
const maxDropoff = Math.max(
...list
.map((s) => s.dropoffCount || 0)
.filter((count) => count > 0)
);
// Check if this is the first step with the highest dropoff
return (
step.dropoffCount === maxDropoff &&
list.findIndex((s) => s.dropoffCount === maxDropoff) === index
);
})(),
};
});
return {
id: data[0]?.id ?? 'none',
breakdowns: data[0]?.breakdowns ?? [],
steps,
totalSessions,
lastStep: last(steps)!,
mostDropoffsStep: steps.find((step) => step.isHighestDropoff)!,
};
})
.sort((a, b) => {
const aTotal = a.steps.reduce((acc, step) => acc + step.count, 0);
const bTotal = b.steps.reduce((acc, step) => acc + step.count, 0);
return bTotal - aTotal;
});
}
}
export const funnelService = new FunnelService(ch);