feat: revenue tracking

* wip

* wip

* wip

* wip

* show revenue better on overview

* align realtime and overview counters

* update revenue docs

* always return device id

* add project settings, improve projects charts,

* fix: comments

* fixes

* fix migration

* ignore sql files

* fix comments
This commit is contained in:
Carl-Gerhard Lindesvärd
2025-11-19 14:27:34 +01:00
committed by GitHub
parent d61cbf6f2c
commit 790801b728
58 changed files with 2191 additions and 23691 deletions

View File

@@ -28,8 +28,24 @@ export class SessionBuffer extends BaseBuffer {
this.redis = getRedisCache();
}
public async getExistingSession(sessionId: string) {
const hit = await this.redis.get(`session:${sessionId}`);
public async getExistingSession(
options:
| {
sessionId: string;
}
| {
projectId: string;
profileId: string;
},
) {
let hit: string | null = null;
if ('sessionId' in options) {
hit = await this.redis.get(`session:${options.sessionId}`);
} else {
hit = await this.redis.get(
`session:${options.projectId}:${options.profileId}`,
);
}
if (hit) {
return getSafeJson<IClickhouseSession>(hit);
@@ -41,7 +57,9 @@ export class SessionBuffer extends BaseBuffer {
async getSession(
event: IClickhouseEvent,
): Promise<[IClickhouseSession] | [IClickhouseSession, IClickhouseSession]> {
const existingSession = await this.getExistingSession(event.session_id);
const existingSession = await this.getExistingSession({
sessionId: event.session_id,
});
if (existingSession) {
const oldSession = assocPath(['sign'], -1, clone(existingSession));
@@ -77,7 +95,9 @@ export class SessionBuffer extends BaseBuffer {
...(event.properties || {}),
...(newSession.properties || {}),
});
// newSession.revenue += event.properties?.__revenue ?? 0;
const addedRevenue = event.name === 'revenue' ? (event.revenue ?? 0) : 0;
newSession.revenue = (newSession.revenue ?? 0) + addedRevenue;
if (event.name === 'screen_view' && event.path) {
newSession.screen_views.push(event.path);
@@ -114,7 +134,7 @@ export class SessionBuffer extends BaseBuffer {
entry_origin: event.origin,
exit_path: event.path,
exit_origin: event.origin,
revenue: 0,
revenue: event.name === 'revenue' ? (event.revenue ?? 0) : 0,
referrer: event.referrer,
referrer_name: event.referrer_name,
referrer_type: event.referrer_type,
@@ -174,6 +194,14 @@ export class SessionBuffer extends BaseBuffer {
'EX',
60 * 60,
);
if (newSession.profile_id) {
multi.set(
`session:${newSession.project_id}:${newSession.profile_id}`,
JSON.stringify(newSession),
'EX',
60 * 60,
);
}
for (const session of sessions) {
multi.rpush(this.redisKey, JSON.stringify(session));
}

View File

@@ -140,10 +140,10 @@ export function addColumns(
isClustered: boolean,
): string[] {
if (isClustered) {
return columns.map(
(col) =>
`ALTER TABLE ${replicated(tableName)} ON CLUSTER '{cluster}' ADD COLUMN IF NOT EXISTS ${col}`,
);
return columns.flatMap((col) => [
`ALTER TABLE ${replicated(tableName)} ON CLUSTER '{cluster}' ADD COLUMN IF NOT EXISTS ${col}`,
`ALTER TABLE ${tableName} ON CLUSTER '{cluster}' ADD COLUMN IF NOT EXISTS ${col}`,
]);
}
return columns.map(
@@ -160,10 +160,10 @@ export function dropColumns(
isClustered: boolean,
): string[] {
if (isClustered) {
return columnNames.map(
(colName) =>
`ALTER TABLE ${replicated(tableName)} ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS ${colName}`,
);
return columnNames.flatMap((colName) => [
`ALTER TABLE ${replicated(tableName)} ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS ${colName}`,
`ALTER TABLE ${tableName} ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS ${colName}`,
]);
}
return columnNames.map(

View File

@@ -174,23 +174,43 @@ export function getChartSql({
}
if (event.segment === 'property_sum' && event.property) {
sb.select.count = `sum(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
if (event.property === 'revenue') {
sb.select.count = `sum(revenue) as count`;
sb.where.property = `revenue > 0`;
} else {
sb.select.count = `sum(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
}
}
if (event.segment === 'property_average' && event.property) {
sb.select.count = `avg(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
if (event.property === 'revenue') {
sb.select.count = `avg(revenue) as count`;
sb.where.property = `revenue > 0`;
} else {
sb.select.count = `avg(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
}
}
if (event.segment === 'property_max' && event.property) {
sb.select.count = `max(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
if (event.property === 'revenue') {
sb.select.count = `max(revenue) as count`;
sb.where.property = `revenue > 0`;
} else {
sb.select.count = `max(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
}
}
if (event.segment === 'property_min' && event.property) {
sb.select.count = `min(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
if (event.property === 'revenue') {
sb.select.count = `min(revenue) as count`;
sb.where.property = `revenue > 0`;
} else {
sb.select.count = `min(toFloat64(${getSelectPropertyKey(event.property)})) as count`;
sb.where.property = `${getSelectPropertyKey(event.property)} IS NOT NULL AND notEmpty(${getSelectPropertyKey(event.property)})`;
}
}
if (event.segment === 'one_event_per_user') {

View File

@@ -25,6 +25,7 @@ import {
getProfilesCached,
upsertProfile,
} from './profile.service';
import type { IClickhouseSession } from './session.service';
export type IImportedEvent = Omit<
IClickhouseEvent,
@@ -92,12 +93,62 @@ export interface IClickhouseEvent {
imported_at: string | null;
sdk_name: string;
sdk_version: string;
revenue?: number;
// They do not exist here. Just make ts happy for now
profile?: IServiceProfile;
meta?: EventMeta;
}
export function transformSessionToEvent(
session: IClickhouseSession,
): IServiceEvent {
return {
id: '', // Not used
name: 'screen_view',
sessionId: session.id,
profileId: session.profile_id,
path: session.exit_path,
origin: session.exit_origin,
createdAt: convertClickhouseDateToJs(session.ended_at),
referrer: session.referrer,
referrerName: session.referrer_name,
referrerType: session.referrer_type,
os: session.os,
osVersion: session.os_version,
browser: session.browser,
browserVersion: session.browser_version,
device: session.device,
brand: session.brand,
model: session.model,
country: session.country,
region: session.region,
city: session.city,
longitude: session.longitude,
latitude: session.latitude,
projectId: session.project_id,
deviceId: session.device_id,
duration: 0,
revenue: session.revenue,
properties: {
...session.properties,
is_bounce: session.is_bounce,
__query: {
utm_medium: session.utm_medium,
utm_source: session.utm_source,
utm_campaign: session.utm_campaign,
utm_content: session.utm_content,
utm_term: session.utm_term,
},
},
profile: undefined,
meta: undefined,
importedAt: undefined,
sdkName: undefined,
sdkVersion: undefined,
};
}
export function transformEvent(event: IClickhouseEvent): IServiceEvent {
return {
id: event.id,
@@ -131,6 +182,7 @@ export function transformEvent(event: IClickhouseEvent): IServiceEvent {
sdkName: event.sdk_name,
sdkVersion: event.sdk_version,
profile: event.profile,
revenue: event.revenue,
};
}
@@ -178,6 +230,7 @@ export interface IServiceEvent {
meta: EventMeta | undefined;
sdkName: string | undefined;
sdkVersion: string | undefined;
revenue?: number;
}
type SelectHelper<T> = {
@@ -336,6 +389,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
imported_at: null,
sdk_name: payload.sdkName ?? '',
sdk_version: payload.sdkVersion ?? '',
revenue: payload.revenue,
};
const promises = [sessionBuffer.add(event), eventBuffer.add(event)];

View File

@@ -104,6 +104,7 @@ export class OverviewService {
avg_session_duration: number;
total_screen_views: number;
views_per_session: number;
total_revenue: number;
};
series: {
date: string;
@@ -113,6 +114,7 @@ export class OverviewService {
avg_session_duration: number;
total_screen_views: number;
views_per_session: number;
total_revenue: number;
}[];
}> {
const where = this.getRawWhereClause('sessions', filters);
@@ -122,6 +124,7 @@ export class OverviewService {
.select([
`${clix.toStartOf('created_at', interval, timezone)} AS date`,
'round((countIf(is_bounce = 1 AND sign = 1) * 100.) / countIf(sign = 1), 2) AS bounce_rate',
'sum(revenue * sign) AS total_revenue',
])
.from(TABLE_NAMES.sessions, true)
.where('sign', '=', 1)
@@ -165,10 +168,17 @@ export class OverviewService {
.from('session_agg')
.where('date', '=', rollupDate),
)
.with(
'overall_total_revenue',
clix(this.client, timezone)
.select(['total_revenue'])
.from('session_agg')
.where('date', '=', rollupDate),
)
.with(
'daily_stats',
clix(this.client, timezone)
.select(['date', 'bounce_rate'])
.select(['date', 'bounce_rate', 'total_revenue'])
.from('session_agg')
.where('date', '!=', rollupDate),
)
@@ -181,9 +191,11 @@ export class OverviewService {
avg_session_duration: number;
total_screen_views: number;
views_per_session: number;
total_revenue: number;
overall_unique_visitors: number;
overall_total_sessions: number;
overall_bounce_rate: number;
overall_total_revenue: number;
}>([
`${clix.toStartOf('e.created_at', interval)} AS date`,
'ds.bounce_rate as bounce_rate',
@@ -193,9 +205,11 @@ export class OverviewService {
'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration',
'count(*) AS total_screen_views',
'round((count(*) * 1.) / uniq(e.session_id), 2) AS views_per_session',
'ds.total_revenue AS total_revenue',
'(SELECT unique_visitors FROM overall_unique_visitors) AS overall_unique_visitors',
'(SELECT total_sessions FROM overall_unique_visitors) AS overall_total_sessions',
'(SELECT bounce_rate FROM overall_bounce_rate) AS overall_bounce_rate',
'(SELECT total_revenue FROM overall_total_revenue) AS overall_total_revenue',
])
.from(`${TABLE_NAMES.events} AS e`)
.leftJoin(
@@ -209,7 +223,7 @@ export class OverviewService {
clix.datetime(endDate, 'toDateTime'),
])
.rawWhere(this.getRawWhereClause('events', filters))
.groupBy(['date', 'ds.bounce_rate'])
.groupBy(['date', 'ds.bounce_rate', 'ds.total_revenue'])
.orderBy('date', 'ASC')
.fill(
clix.toStartOf(
@@ -234,7 +248,8 @@ export class OverviewService {
(item) =>
item.overall_bounce_rate !== null ||
item.overall_total_sessions !== null ||
item.overall_unique_visitors !== null,
item.overall_unique_visitors !== null ||
item.overall_total_revenue !== null,
);
return {
metrics: {
@@ -250,12 +265,14 @@ export class OverviewService {
views_per_session: average(
res.map((item) => item.views_per_session),
),
total_revenue: anyRowWithData?.overall_total_revenue ?? 0,
},
series: res.map(
omit([
'overall_bounce_rate',
'overall_unique_visitors',
'overall_total_sessions',
'overall_total_revenue',
]),
),
};
@@ -271,6 +288,7 @@ export class OverviewService {
avg_session_duration: number;
total_screen_views: number;
views_per_session: number;
total_revenue: number;
}>([
`${clix.toStartOf('created_at', interval, timezone)} AS date`,
'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) as bounce_rate',
@@ -280,6 +298,7 @@ export class OverviewService {
'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration',
'sum(sign * screen_view_count) AS total_screen_views',
'round(sum(sign * screen_view_count) * 1.0 / sum(sign), 2) AS views_per_session',
'sum(revenue * sign) AS total_revenue',
])
.from('sessions')
.where('created_at', 'BETWEEN', [
@@ -320,6 +339,7 @@ export class OverviewService {
avg_session_duration: res[0]?.avg_session_duration ?? 0,
total_screen_views: res[0]?.total_screen_views ?? 0,
views_per_session: res[0]?.views_per_session ?? 0,
total_revenue: res[0]?.total_revenue ?? 0,
},
series: res
.slice(1)
@@ -394,6 +414,7 @@ export class OverviewService {
'entry_path',
'entry_origin',
'coalesce(round(countIf(is_bounce = 1 AND sign = 1) * 100.0 / countIf(sign = 1), 2), 0) as bounce_rate',
'sum(revenue * sign) as revenue',
])
.from(TABLE_NAMES.sessions, true)
.where('sign', '=', 1)
@@ -417,6 +438,7 @@ export class OverviewService {
avg_duration: number;
bounce_rate: number;
sessions: number;
revenue: number;
}>([
'p.title',
'p.origin',
@@ -424,6 +446,7 @@ export class OverviewService {
'p.avg_duration',
'p.count as sessions',
'b.bounce_rate',
'coalesce(b.revenue, 0) as revenue',
])
.from('page_stats p', false)
.leftJoin(
@@ -465,12 +488,14 @@ export class OverviewService {
avg_duration: number;
bounce_rate: number;
sessions: number;
revenue: number;
}>([
`${mode}_origin AS origin`,
`${mode}_path AS path`,
'round(avg(duration * sign)/1000, 2) as avg_duration',
'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) as bounce_rate',
'sum(sign) as sessions',
'sum(revenue * sign) as revenue',
])
.from(TABLE_NAMES.sessions, true)
.where('project_id', '=', projectId)
@@ -566,12 +591,14 @@ export class OverviewService {
sessions: number;
bounce_rate: number;
avg_session_duration: number;
revenue: number;
}>([
prefixColumn && `${prefixColumn} as prefix`,
`nullIf(${column}, '') as name`,
'sum(sign) as sessions',
'round(sum(sign * is_bounce) * 100.0 / sum(sign), 2) AS bounce_rate',
'round(avgIf(duration, duration > 0 AND sign > 0), 2)/1000 AS avg_session_duration',
'sum(revenue * sign) as revenue',
])
.from(TABLE_NAMES.sessions, true)
.where('project_id', '=', projectId)

View File

@@ -28,6 +28,7 @@ export type IProfileMetrics = {
avgEventsPerSession: number;
conversionEvents: number;
avgTimeBetweenSessions: number;
revenue: number;
};
export function getProfileMetrics(profileId: string, projectId: string) {
return chQuery<
@@ -76,6 +77,9 @@ export function getProfileMetrics(profileId: string, projectId: string) {
WHEN (SELECT sessions FROM sessions) <= 1 THEN 0
ELSE round(dateDiff('second', (SELECT firstSeen FROM firstSeen), (SELECT lastSeen FROM lastSeen)) / nullIf((SELECT sessions FROM sessions) - 1, 0), 1)
END as avgTimeBetweenSessions
),
revenue AS (
SELECT sum(revenue) as revenue FROM ${TABLE_NAMES.events} WHERE name = 'revenue' AND profile_id = ${sqlstring.escape(profileId)} AND project_id = ${sqlstring.escape(projectId)}
)
SELECT
(SELECT lastSeen FROM lastSeen) as lastSeen,
@@ -89,7 +93,8 @@ export function getProfileMetrics(profileId: string, projectId: string) {
(SELECT bounceRate FROM bounceRate) as bounceRate,
(SELECT avgEventsPerSession FROM avgEventsPerSession) as avgEventsPerSession,
(SELECT conversionEvents FROM conversionEvents) as conversionEvents,
(SELECT avgTimeBetweenSessions FROM avgTimeBetweenSessions) as avgTimeBetweenSessions
(SELECT avgTimeBetweenSessions FROM avgTimeBetweenSessions) as avgTimeBetweenSessions,
(SELECT revenue FROM revenue) as revenue
`)
.then((data) => data[0]!)
.then((data) => {

View File

@@ -209,12 +209,27 @@ export function sessionConsistency() {
// Since the check probably goes to the primary anyways it will always be true,
// Not sure how to check LSN on the actual replica that will be used for the read.
if (
model !== 'Session' &&
isReadOperation(operation) &&
sessionId &&
(await getCachedWalLsn(sessionId))
) {
// This will force readReplicas extension to use primary
__internalParams.transaction = true;
const MAX_RETRIES = 3;
const INITIAL_RETRY_DELAY_MS = 50;
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
const result = await query(args);
if (result !== null) {
return result;
}
// If not the last attempt, wait with exponential backoff before retrying
if (attempt < MAX_RETRIES - 1) {
const delayMs = INITIAL_RETRY_DELAY_MS * 2 ** attempt;
await sleep(delayMs);
}
}
}
return query(args);