This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-06 09:00:10 +01:00
parent 765e4aa107
commit 90881e5ffb
68 changed files with 4092 additions and 1694 deletions

View File

@@ -1,11 +1,7 @@
import { getSafeJson } from '@openpanel/json';
import {
type Redis,
getRedisCache,
publishEvent,
} from '@openpanel/redis';
import { getRedisCache, publishEvent, type Redis } from '@openpanel/redis';
import { ch } from '../clickhouse/client';
import { type IClickhouseEvent } from '../services/event.service';
import type { IClickhouseEvent } from '../services/event.service';
import { BaseBuffer } from './base-buffer';
export class EventBuffer extends BaseBuffer {
@@ -95,7 +91,7 @@ export class EventBuffer extends BaseBuffer {
this.incrementActiveVisitorCount(
multi,
event.project_id,
event.profile_id,
event.profile_id
);
}
}
@@ -116,7 +112,7 @@ export class EventBuffer extends BaseBuffer {
error,
eventCount: eventsToFlush.length,
flushRetryCount: this.flushRetryCount,
},
}
);
} finally {
this.isFlushing = false;
@@ -137,7 +133,7 @@ export class EventBuffer extends BaseBuffer {
const queueEvents = await redis.lrange(
this.queueKey,
0,
this.batchSize - 1,
this.batchSize - 1
);
if (queueEvents.length === 0) {
@@ -149,6 +145,9 @@ export class EventBuffer extends BaseBuffer {
for (const eventStr of queueEvents) {
const event = getSafeJson<IClickhouseEvent>(eventStr);
if (event) {
if (!Array.isArray(event.groups)) {
event.groups = [];
}
eventsToClickhouse.push(event);
}
}
@@ -161,7 +160,7 @@ export class EventBuffer extends BaseBuffer {
eventsToClickhouse.sort(
(a, b) =>
new Date(a.created_at || 0).getTime() -
new Date(b.created_at || 0).getTime(),
new Date(b.created_at || 0).getTime()
);
this.logger.info('Inserting events into ClickHouse', {
@@ -181,7 +180,7 @@ export class EventBuffer extends BaseBuffer {
for (const event of eventsToClickhouse) {
countByProject.set(
event.project_id,
(countByProject.get(event.project_id) ?? 0) + 1,
(countByProject.get(event.project_id) ?? 0) + 1
);
}
for (const [projectId, count] of countByProject) {
@@ -222,7 +221,7 @@ export class EventBuffer extends BaseBuffer {
private incrementActiveVisitorCount(
multi: ReturnType<Redis['multi']>,
projectId: string,
profileId: string,
profileId: string
) {
const key = `${projectId}:${profileId}`;
const now = Date.now();

View File

@@ -0,0 +1,151 @@
import { getRedisCache } from '@openpanel/redis';
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { getSafeJson } from '@openpanel/json';
import type { IClickhouseProfile } from '../services/profile.service';
// Mock chQuery to avoid hitting real ClickHouse
vi.mock('../clickhouse/client', () => ({
ch: {
insert: vi.fn().mockResolvedValue(undefined),
},
chQuery: vi.fn().mockResolvedValue([]),
TABLE_NAMES: {
profiles: 'profiles',
},
}));
import { ProfileBuffer } from './profile-buffer';
import { chQuery } from '../clickhouse/client';
const redis = getRedisCache();
function makeProfile(overrides: Partial<IClickhouseProfile>): IClickhouseProfile {
return {
id: 'profile-1',
project_id: 'project-1',
first_name: '',
last_name: '',
email: '',
avatar: '',
properties: {},
is_external: true,
created_at: new Date().toISOString(),
groups: [],
...overrides,
};
}
beforeEach(async () => {
await redis.flushdb();
vi.mocked(chQuery).mockResolvedValue([]);
});
afterAll(async () => {
try {
await redis.quit();
} catch {}
});
describe('ProfileBuffer', () => {
let profileBuffer: ProfileBuffer;
beforeEach(() => {
profileBuffer = new ProfileBuffer();
});
it('adds a profile to the buffer', async () => {
const profile = makeProfile({ first_name: 'John', email: 'john@example.com' });
const sizeBefore = await profileBuffer.getBufferSize();
await profileBuffer.add(profile);
const sizeAfter = await profileBuffer.getBufferSize();
expect(sizeAfter).toBe(sizeBefore + 1);
});
it('merges subsequent updates via cache (sequential calls)', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
// Sequential: identify first, then group
await profileBuffer.add(identifyProfile);
await profileBuffer.add(groupProfile);
// Second add should read the cached identify profile and merge groups in
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
it('race condition: concurrent identify + group calls preserve all data', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
// Both calls run concurrently — the per-profile lock serializes them so the
// second one reads the first's result from cache and merges correctly.
await Promise.all([
profileBuffer.add(identifyProfile),
profileBuffer.add(groupProfile),
]);
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
it('race condition: concurrent writes produce one merged buffer entry', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
const sizeBefore = await profileBuffer.getBufferSize();
await Promise.all([
profileBuffer.add(identifyProfile),
profileBuffer.add(groupProfile),
]);
const sizeAfter = await profileBuffer.getBufferSize();
// The second add merges into the first — only 2 buffer entries total
// (one from identify, one merged update with group)
expect(sizeAfter).toBe(sizeBefore + 2);
// The last entry in the buffer should have both name and group
const rawEntries = await redis.lrange('profile-buffer', 0, -1);
const entries = rawEntries.map((e) => getSafeJson<IClickhouseProfile>(e));
const lastEntry = entries[entries.length - 1];
expect(lastEntry?.first_name).toBe('John');
expect(lastEntry?.groups).toContain('group-abc');
});
});

View File

@@ -1,9 +1,10 @@
import { deepMergeObjects } from '@openpanel/common';
import { generateSecureId } from '@openpanel/common/server';
import { getSafeJson } from '@openpanel/json';
import type { ILogger } from '@openpanel/logger';
import { getRedisCache, type Redis } from '@openpanel/redis';
import shallowEqual from 'fast-deep-equal';
import { omit } from 'ramda';
import { omit, uniq } from 'ramda';
import sqlstring from 'sqlstring';
import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client';
import type { IClickhouseProfile } from '../services/profile.service';
@@ -24,6 +25,15 @@ export class ProfileBuffer extends BaseBuffer {
private readonly redisProfilePrefix = 'profile-cache:';
private redis: Redis;
private releaseLockSha: string | null = null;
private readonly releaseLockScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
constructor() {
super({
@@ -33,6 +43,9 @@ export class ProfileBuffer extends BaseBuffer {
},
});
this.redis = getRedisCache();
this.redis.script('LOAD', this.releaseLockScript).then((sha) => {
this.releaseLockSha = sha as string;
});
}
private getProfileCacheKey({
@@ -45,6 +58,42 @@ export class ProfileBuffer extends BaseBuffer {
return `${this.redisProfilePrefix}${projectId}:${profileId}`;
}
private async withProfileLock<T>(
profileId: string,
projectId: string,
fn: () => Promise<T>
): Promise<T> {
const lockKey = `profile-lock:${projectId}:${profileId}`;
const lockId = generateSecureId('lock');
const maxRetries = 10;
const retryDelayMs = 25;
for (let i = 0; i < maxRetries; i++) {
const acquired = await this.redis.set(lockKey, lockId, 'EX', 5, 'NX');
if (acquired === 'OK') {
try {
return await fn();
} finally {
if (this.releaseLockSha) {
await this.redis.evalsha(this.releaseLockSha, 1, lockKey, lockId);
} else {
await this.redis.eval(this.releaseLockScript, 1, lockKey, lockId);
}
}
}
await new Promise((resolve) => setTimeout(resolve, retryDelayMs));
}
this.logger.error(
'Failed to acquire profile lock, proceeding without lock',
{
profileId,
projectId,
}
);
return fn();
}
async alreadyExists(profile: IClickhouseProfile) {
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
@@ -67,83 +116,94 @@ export class ProfileBuffer extends BaseBuffer {
return;
}
const existingProfile = await this.fetchProfile(profile, logger);
await this.withProfileLock(profile.id, profile.project_id, async () => {
const existingProfile = await this.fetchProfile(profile, logger);
// Delete any properties that are not server related if we have a non-server profile
if (
existingProfile?.properties.device !== 'server' &&
profile.properties.device === 'server'
) {
profile.properties = omit(
[
'city',
'country',
'region',
'longitude',
'latitude',
'os',
'osVersion',
'browser',
'device',
'isServer',
'os_version',
'browser_version',
],
profile.properties
);
}
// Delete any properties that are not server related if we have a non-server profile
if (
existingProfile?.properties.device !== 'server' &&
profile.properties.device === 'server'
) {
profile.properties = omit(
[
'city',
'country',
'region',
'longitude',
'latitude',
'os',
'osVersion',
'browser',
'device',
'isServer',
'os_version',
'browser_version',
],
profile.properties
);
}
const mergedProfile: IClickhouseProfile = existingProfile
? deepMergeObjects(existingProfile, omit(['created_at'], profile))
: profile;
const mergedProfile: IClickhouseProfile = existingProfile
? {
...deepMergeObjects(
existingProfile,
omit(['created_at', 'groups'], profile)
),
groups: uniq([
...(existingProfile.groups ?? []),
...(profile.groups ?? []),
]),
}
: profile;
if (
profile &&
existingProfile &&
shallowEqual(
omit(['created_at'], existingProfile),
omit(['created_at'], mergedProfile)
)
) {
this.logger.debug('Profile not changed, skipping');
return;
}
if (
profile &&
existingProfile &&
shallowEqual(
omit(['created_at'], existingProfile),
omit(['created_at'], mergedProfile)
)
) {
this.logger.debug('Profile not changed, skipping');
return;
}
this.logger.debug('Merged profile will be inserted', {
mergedProfile,
existingProfile,
profile,
});
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
const result = await this.redis
.multi()
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds)
.rpush(this.redisKey, JSON.stringify(mergedProfile))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
.exec();
if (!result) {
this.logger.error('Failed to add profile to Redis', {
this.logger.debug('Merged profile will be inserted', {
mergedProfile,
existingProfile,
profile,
cacheKey,
});
return;
}
const bufferLength = (result?.[3]?.[1] as number) ?? 0;
this.logger.debug('Current buffer length', {
bufferLength,
batchSize: this.batchSize,
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
const result = await this.redis
.multi()
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds)
.rpush(this.redisKey, JSON.stringify(mergedProfile))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
.exec();
if (!result) {
this.logger.error('Failed to add profile to Redis', {
profile,
cacheKey,
});
return;
}
const bufferLength = (result?.[3]?.[1] as number) ?? 0;
this.logger.debug('Current buffer length', {
bufferLength,
batchSize: this.batchSize,
});
if (bufferLength >= this.batchSize) {
await this.tryFlush();
}
});
if (bufferLength >= this.batchSize) {
await this.tryFlush();
}
} catch (error) {
this.logger.error('Failed to add profile', { error, profile });
}

View File

@@ -109,6 +109,12 @@ export class SessionBuffer extends BaseBuffer {
newSession.profile_id = event.profile_id;
}
if (event.groups) {
newSession.groups = [
...new Set([...newSession.groups, ...event.groups]),
];
}
return [newSession, oldSession];
}
@@ -119,6 +125,7 @@ export class SessionBuffer extends BaseBuffer {
profile_id: event.profile_id,
project_id: event.project_id,
device_id: event.device_id,
groups: event.groups,
created_at: event.created_at,
ended_at: event.created_at,
event_count: event.name === 'screen_view' ? 0 : 1,

View File

@@ -66,6 +66,7 @@ export class Query<T = any> {
alias?: string;
}[] = [];
private _skipNext = false;
private _rawJoins: string[] = [];
private _fill?: {
from: string | Date;
to: string | Date;
@@ -329,6 +330,12 @@ export class Query<T = any> {
return this.joinWithType('CROSS', table, '', alias);
}
rawJoin(sql: string): this {
if (this._skipNext) return this;
this._rawJoins.push(sql);
return this;
}
private joinWithType(
type: JoinType,
table: string | Expression | Query,
@@ -414,6 +421,10 @@ export class Query<T = any> {
`${join.type} JOIN ${join.table instanceof Query ? `(${join.table.toSQL()})` : join.table instanceof Expression ? `(${join.table.toString()})` : join.table}${aliasClause}${conditionStr}`,
);
});
// Add raw joins (e.g. ARRAY JOIN)
this._rawJoins.forEach((join) => {
parts.push(join);
});
}
// WHERE
@@ -590,6 +601,7 @@ export class Query<T = any> {
// Merge JOINS
this._joins = [...this._joins, ...query._joins];
this._rawJoins = [...this._rawJoins, ...query._rawJoins];
// Merge settings
this._settings = { ...this._settings, ...query._settings };

View File

@@ -1,3 +1,4 @@
/** biome-ignore-all lint/style/useDefaultSwitchClause: <explanation> */
import { DateTime, stripLeadingAndTrailingSlashes } from '@openpanel/common';
import type {
IChartEventFilter,
@@ -30,35 +31,77 @@ export function transformPropertyKey(property: string) {
return `${match}['${property.replace(new RegExp(`^${match}.`), '')}']`;
}
// Returns a SQL expression for a group property using dictGet
// Returns a SQL expression for a group property via the _g JOIN alias
// property format: "group.name", "group.type", "group.properties.plan"
export function getGroupPropertySql(
property: string,
projectId: string
): string {
export function getGroupPropertySql(property: string): string {
const withoutPrefix = property.replace(/^group\./, '');
if (withoutPrefix === 'name') {
return `dictGet('${TABLE_NAMES.groups_dict}', 'name', tuple(_group_id, ${sqlstring.escape(projectId)}))`;
return '_g.name';
}
if (withoutPrefix === 'type') {
return `dictGet('${TABLE_NAMES.groups_dict}', 'type', tuple(_group_id, ${sqlstring.escape(projectId)}))`;
return '_g.type';
}
if (withoutPrefix.startsWith('properties.')) {
const propKey = withoutPrefix.replace(/^properties\./, '');
// properties is stored as JSON string in dict; use JSONExtractString
return `JSONExtractString(dictGet('${TABLE_NAMES.groups_dict}', 'properties', tuple(_group_id, ${sqlstring.escape(projectId)})), ${sqlstring.escape(propKey)})`;
return `_g.properties[${sqlstring.escape(propKey)}]`;
}
return '_group_id';
}
// Returns the SELECT expression when querying the groups table directly (no join alias).
// Use for fetching distinct values for group.* properties.
export function getGroupPropertySelect(property: string): string {
const withoutPrefix = property.replace(/^group\./, '');
if (withoutPrefix === 'name') {
return 'name';
}
if (withoutPrefix === 'type') {
return 'type';
}
if (withoutPrefix === 'id') {
return 'id';
}
if (withoutPrefix.startsWith('properties.')) {
const propKey = withoutPrefix.replace(/^properties\./, '');
return `properties[${sqlstring.escape(propKey)}]`;
}
return 'id';
}
// Returns the SELECT expression when querying the profiles table directly (no join alias).
// Use for fetching distinct values for profile.* properties.
export function getProfilePropertySelect(property: string): string {
const withoutPrefix = property.replace(/^profile\./, '');
if (withoutPrefix === 'id') {
return 'id';
}
if (withoutPrefix === 'first_name') {
return 'first_name';
}
if (withoutPrefix === 'last_name') {
return 'last_name';
}
if (withoutPrefix === 'email') {
return 'email';
}
if (withoutPrefix === 'avatar') {
return 'avatar';
}
if (withoutPrefix.startsWith('properties.')) {
const propKey = withoutPrefix.replace(/^properties\./, '');
return `properties[${sqlstring.escape(propKey)}]`;
}
return 'id';
}
export function getSelectPropertyKey(property: string, projectId?: string) {
if (property === 'has_profile') {
return `if(profile_id != device_id, 'true', 'false')`;
}
// Handle group properties — requires ARRAY JOIN to be present in query
// Handle group properties — requires ARRAY JOIN + _g JOIN to be present in query
if (property.startsWith('group.') && projectId) {
return getGroupPropertySql(property, projectId);
return getGroupPropertySql(property);
}
const propertyPatterns = ['properties', 'profile.properties'];
@@ -86,9 +129,7 @@ export function getChartSql({
startDate,
endDate,
projectId,
limit,
timezone,
chartType,
}: IGetChartDataInput & { timezone: string }) {
const {
sb,
@@ -130,7 +171,12 @@ export function getChartSql({
anyFilterOnGroup || anyBreakdownOnGroup || event.segment === 'group';
if (needsGroupArrayJoin) {
addCte(
'_g',
`SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}`
);
sb.joins.groups = 'ARRAY JOIN groups AS _group_id';
sb.joins.groups_table = 'LEFT ANY JOIN _g ON _g.id = _group_id';
}
// Build WHERE clause without the bar filter (for use in subqueries and CTEs)
@@ -263,31 +309,6 @@ export function getChartSql({
sb.where.endDate = `created_at <= toDateTime('${formatClickhouseDate(endDate)}')`;
}
// Use CTE to define top breakdown values once, then reference in WHERE clause
if (breakdowns.length > 0 && limit) {
const breakdownSelects = breakdowns
.map((b) => getSelectPropertyKey(b.name, projectId))
.join(', ');
const groupArrayJoinClause = needsGroupArrayJoin
? 'ARRAY JOIN groups AS _group_id'
: '';
// Add top_breakdowns CTE using the builder
addCte(
'top_breakdowns',
`SELECT ${breakdownSelects}
FROM ${TABLE_NAMES.events} e
${profilesJoinRef ? `${profilesJoinRef} ` : ''}${groupArrayJoinClause ? `${groupArrayJoinClause} ` : ''}${getWhereWithoutBar()}
GROUP BY ${breakdownSelects}
ORDER BY count(*) DESC
LIMIT ${limit}`
);
// Filter main query to only include top breakdown values
sb.where.bar = `(${breakdowns.map((b) => getSelectPropertyKey(b.name, projectId)).join(',')}) IN (SELECT * FROM top_breakdowns)`;
}
breakdowns.forEach((breakdown, index) => {
// Breakdowns start at label_1 (label_0 is reserved for event name)
const key = `label_${index + 1}`;
@@ -350,6 +371,10 @@ export function getChartSql({
}
// Note: The profile CTE (if it exists) is available in subqueries, so we can reference it directly
const subqueryGroupJoins = needsGroupArrayJoin
? 'ARRAY JOIN groups AS _group_id LEFT ANY JOIN _g ON _g.id = _group_id '
: '';
if (breakdowns.length > 0) {
// Match breakdown properties in subquery with outer query's grouped values
// Since outer query groups by label_X, we reference those in the correlation
@@ -370,7 +395,7 @@ export function getChartSql({
sb.select.total_unique_count = `(
SELECT uniq(profile_id)
FROM ${TABLE_NAMES.events} e2
${profilesJoinRef ? `${profilesJoinRef} ` : ''}${subqueryWhere}
${subqueryGroupJoins}${profilesJoinRef ? `${profilesJoinRef} ` : ''}${subqueryWhere}
AND ${breakdownMatches}
) as total_count`;
} else {
@@ -383,7 +408,7 @@ export function getChartSql({
sb.select.total_unique_count = `(
SELECT uniq(profile_id)
FROM ${TABLE_NAMES.events} e2
${profilesJoinRef ? `${profilesJoinRef} ` : ''}${subqueryWhere}
${subqueryGroupJoins}${profilesJoinRef ? `${profilesJoinRef} ` : ''}${subqueryWhere}
) as total_count`;
}
@@ -432,18 +457,14 @@ export function getAggregateChartSql({
anyFilterOnGroup || anyBreakdownOnGroup || event.segment === 'group';
if (needsGroupArrayJoin) {
addCte(
'_g',
`SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}`
);
sb.joins.groups = 'ARRAY JOIN groups AS _group_id';
sb.joins.groups_table = 'LEFT ANY JOIN _g ON _g.id = _group_id';
}
// Build WHERE clause without the bar filter (for use in subqueries and CTEs)
const getWhereWithoutBar = () => {
const whereWithoutBar = { ...sb.where };
delete whereWithoutBar.bar;
return Object.keys(whereWithoutBar).length
? `WHERE ${join(whereWithoutBar, ' AND ')}`
: '';
};
// Collect all profile fields used in filters and breakdowns
const getProfileFields = () => {
const fields = new Set<string>();
@@ -534,30 +555,6 @@ export function getAggregateChartSql({
// Use startDate as the date value since we're aggregating across the entire range
sb.select.date = `${sqlstring.escape(startDate)} as date`;
// Use CTE to define top breakdown values once, then reference in WHERE clause
if (breakdowns.length > 0 && limit) {
const breakdownSelects = breakdowns
.map((b) => getSelectPropertyKey(b.name, projectId))
.join(', ');
const groupArrayJoinClause = needsGroupArrayJoin
? 'ARRAY JOIN groups AS _group_id'
: '';
addCte(
'top_breakdowns',
`SELECT ${breakdownSelects}
FROM ${TABLE_NAMES.events} e
${profilesJoinRef ? `${profilesJoinRef} ` : ''}${groupArrayJoinClause ? `${groupArrayJoinClause} ` : ''}${getWhereWithoutBar()}
GROUP BY ${breakdownSelects}
ORDER BY count(*) DESC
LIMIT ${limit}`
);
// Filter main query to only include top breakdown values
sb.where.bar = `(${breakdowns.map((b) => getSelectPropertyKey(b.name, projectId)).join(',')}) IN (SELECT * FROM top_breakdowns)`;
}
// Add breakdowns to SELECT and GROUP BY
breakdowns.forEach((breakdown, index) => {
// Breakdowns start at label_1 (label_0 is reserved for event name)
@@ -673,9 +670,9 @@ export function getEventFiltersWhereClause(
return;
}
// Handle group. prefixed filters using dictGet (requires ARRAY JOIN in query)
// Handle group. prefixed filters (requires ARRAY JOIN + _g JOIN in query)
if (name.startsWith('group.') && projectId) {
const whereFrom = getGroupPropertySql(name, projectId);
const whereFrom = getGroupPropertySql(name);
switch (operator) {
case 'is': {
if (value.length === 1) {

View File

@@ -31,14 +31,14 @@ export class ConversionService {
const funnelWindow = funnelOptions?.funnelWindow ?? 24;
const group = funnelGroup === 'profile_id' ? 'profile_id' : 'session_id';
const breakdownExpressions = breakdowns.map(
(b) => getSelectPropertyKey(b.name),
(b) => getSelectPropertyKey(b.name, projectId),
);
const breakdownSelects = breakdownExpressions.map(
(expr, index) => `${expr} as b_${index}`,
);
const breakdownGroupBy = breakdowns.map((_, index) => `b_${index}`);
// Check if any breakdown uses profile fields and build profile JOIN if needed
// Check if any breakdown or filter uses profile fields
const profileBreakdowns = breakdowns.filter((b) =>
b.name.startsWith('profile.'),
);
@@ -71,6 +71,15 @@ export class ConversionService {
const events = onlyReportEvents(series);
// Check if any breakdown or filter uses group fields
const anyBreakdownOnGroup = breakdowns.some((b) =>
b.name.startsWith('group.'),
);
const anyFilterOnGroup = events.some((e) =>
e.filters?.some((f) => f.name.startsWith('group.')),
);
const needsGroupArrayJoin = anyBreakdownOnGroup || anyFilterOnGroup;
if (events.length !== 2) {
throw new Error('events must be an array of two events');
}
@@ -82,10 +91,10 @@ export class ConversionService {
const eventA = events[0]!;
const eventB = events[1]!;
const whereA = Object.values(
getEventFiltersWhereClause(eventA.filters),
getEventFiltersWhereClause(eventA.filters, projectId),
).join(' AND ');
const whereB = Object.values(
getEventFiltersWhereClause(eventB.filters),
getEventFiltersWhereClause(eventB.filters, projectId),
).join(' AND ');
const funnelWindowSeconds = funnelWindow * 3600;
@@ -98,6 +107,10 @@ export class ConversionService {
? `(name = '${eventB.name}' AND ${whereB})`
: `name = '${eventB.name}'`;
const groupJoin = needsGroupArrayJoin
? `ARRAY JOIN groups AS _group_id LEFT ANY JOIN (SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) AS _g ON _g.id = _group_id`
: '';
// Use windowFunnel approach - single scan, no JOIN
const query = clix(this.client, timezone)
.select<{
@@ -126,6 +139,7 @@ export class ConversionService {
) as steps
FROM ${TABLE_NAMES.events}
${profileJoin}
${groupJoin}
WHERE project_id = '${projectId}'
AND name IN ('${eventA.name}', '${eventB.name}')
AND created_at BETWEEN toDateTime('${startDate}') AND toDateTime('${endDate}')

View File

@@ -32,14 +32,14 @@ export type IImportedEvent = Omit<
properties: Record<string, unknown>;
};
export type IServicePage = {
export interface IServicePage {
path: string;
count: number;
project_id: string;
first_seen: string;
title: string;
origin: string;
};
}
export interface IClickhouseBotEvent {
id: string;
@@ -335,6 +335,7 @@ export async function getEvents(
projectId,
isExternal: false,
properties: {},
groups: [],
};
}
}
@@ -439,6 +440,7 @@ export interface GetEventListOptions {
projectId: string;
profileId?: string;
sessionId?: string;
groupId?: string;
take: number;
cursor?: number | Date;
events?: string[] | null;
@@ -457,6 +459,7 @@ export async function getEventList(options: GetEventListOptions) {
projectId,
profileId,
sessionId,
groupId,
events,
filters,
startDate,
@@ -594,6 +597,10 @@ export async function getEventList(options: GetEventListOptions) {
sb.select.revenue = 'revenue';
}
if (select.groups) {
sb.select.groups = 'groups';
}
if (profileId) {
sb.where.deviceId = `(device_id IN (SELECT device_id as did FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(projectId)} AND device_id != '' AND profile_id = ${sqlstring.escape(profileId)} group by did) OR profile_id = ${sqlstring.escape(profileId)})`;
}
@@ -602,6 +609,10 @@ export async function getEventList(options: GetEventListOptions) {
sb.where.sessionId = `session_id = ${sqlstring.escape(sessionId)}`;
}
if (groupId) {
sb.where.groupId = `has(groups, ${sqlstring.escape(groupId)})`;
}
if (startDate && endDate) {
sb.where.created_at = `toDate(created_at) BETWEEN toDate('${formatClickhouseDate(startDate)}') AND toDate('${formatClickhouseDate(endDate)}')`;
}
@@ -616,7 +627,7 @@ export async function getEventList(options: GetEventListOptions) {
if (filters) {
sb.where = {
...sb.where,
...getEventFiltersWhereClause(filters),
...getEventFiltersWhereClause(filters, projectId),
};
// Join profiles table if any filter uses profile fields
@@ -627,6 +638,13 @@ export async function getEventList(options: GetEventListOptions) {
if (profileFilters.length > 0) {
sb.joins.profiles = `LEFT ANY JOIN (SELECT id, ${uniq(profileFilters.map((f) => f.split('.')[0])).join(', ')} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile on profile.id = profile_id`;
}
// Join groups table if any filter uses group fields
const groupFilters = filters.filter((f) => f.name.startsWith('group.'));
if (groupFilters.length > 0) {
sb.joins.groups = 'ARRAY JOIN groups AS _group_id';
sb.joins.groups_cte = `LEFT ANY JOIN (SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) AS _g ON _g.id = _group_id`;
}
}
sb.orderBy.created_at = 'created_at DESC, id ASC';
@@ -652,6 +670,8 @@ export async function getEventList(options: GetEventListOptions) {
});
}
console.log('getSql', getSql());
return data;
}
@@ -683,7 +703,7 @@ export async function getEventsCount({
if (filters) {
sb.where = {
...sb.where,
...getEventFiltersWhereClause(filters),
...getEventFiltersWhereClause(filters, projectId),
};
// Join profiles table if any filter uses profile fields
@@ -694,6 +714,13 @@ export async function getEventsCount({
if (profileFilters.length > 0) {
sb.joins.profiles = `LEFT ANY JOIN (SELECT id, ${uniq(profileFilters.map((f) => f.split('.')[0])).join(', ')} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile on profile.id = profile_id`;
}
// Join groups table if any filter uses group fields
const groupFilters = filters.filter((f) => f.name.startsWith('group.'));
if (groupFilters.length > 0) {
sb.joins.groups = 'ARRAY JOIN groups AS _group_id';
sb.joins.groups_cte = `LEFT ANY JOIN (SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) AS _g ON _g.id = _group_id`;
}
}
const res = await chQuery<{ count: number }>(
@@ -1057,8 +1084,19 @@ class EventService {
}
if (filters) {
q.rawWhere(
Object.values(getEventFiltersWhereClause(filters)).join(' AND ')
Object.values(
getEventFiltersWhereClause(filters, projectId)
).join(' AND ')
);
const groupFilters = filters.filter((f) =>
f.name.startsWith('group.')
);
if (groupFilters.length > 0) {
q.rawJoin('ARRAY JOIN groups AS _group_id');
q.rawJoin(
`LEFT ANY JOIN (SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) AS _g ON _g.id = _group_id`
);
}
}
},
session: (q) => {

View File

@@ -34,10 +34,10 @@ export class FunnelService {
return group === 'profile_id' ? 'profile_id' : 'session_id';
}
getFunnelConditions(events: IChartEvent[] = []): string[] {
getFunnelConditions(events: IChartEvent[] = [], projectId?: string): string[] {
return events.map((event) => {
const { sb, getWhere } = createSqlBuilder();
sb.where = getEventFiltersWhereClause(event.filters);
sb.where = getEventFiltersWhereClause(event.filters, projectId);
sb.where.name = `name = ${sqlstring.escape(event.name)}`;
return getWhere().replace('WHERE ', '');
});
@@ -71,7 +71,7 @@ export class FunnelService {
additionalGroupBy?: string[];
group?: 'session_id' | 'profile_id';
}) {
const funnels = this.getFunnelConditions(eventSeries);
const funnels = this.getFunnelConditions(eventSeries, projectId);
const primaryKey = group === 'profile_id' ? 'profile_id' : 'session_id';
return clix(this.client, timezone)
@@ -236,10 +236,18 @@ export class FunnelService {
const anyBreakdownOnProfile = breakdowns.some((b) =>
b.name.startsWith('profile.'),
);
const anyFilterOnGroup = eventSeries.some((e) =>
e.filters?.some((f) => f.name.startsWith('group.')),
);
const anyBreakdownOnGroup = breakdowns.some((b) =>
b.name.startsWith('group.'),
);
const needsGroupArrayJoin =
anyFilterOnGroup || anyBreakdownOnGroup || funnelGroup === 'group';
// Create the funnel CTE (session-level)
const breakdownSelects = breakdowns.map(
(b, index) => `${getSelectPropertyKey(b.name)} as b_${index}`,
(b, index) => `${getSelectPropertyKey(b.name, projectId)} as b_${index}`,
);
const breakdownGroupBy = breakdowns.map((b, index) => `b_${index}`);
@@ -277,8 +285,21 @@ export class FunnelService {
);
}
if (needsGroupArrayJoin) {
funnelCte.rawJoin('ARRAY JOIN groups AS _group_id');
funnelCte.rawJoin('LEFT ANY JOIN _g ON _g.id = _group_id');
}
// Base funnel query with CTEs
const funnelQuery = clix(this.client, timezone);
if (needsGroupArrayJoin) {
funnelQuery.with(
'_g',
`SELECT id, name, type, properties FROM ${TABLE_NAMES.groups} FINAL WHERE project_id = ${sqlstring.escape(projectId)}`,
);
}
funnelQuery.with('session_funnel', funnelCte);
// windowFunnel is computed per the primary key (profile_id or session_id),

View File

@@ -1,4 +1,13 @@
import { db } from '../prisma-client';
import { toDots } from '@openpanel/common';
import sqlstring from 'sqlstring';
import {
ch,
chQuery,
formatClickhouseDate,
TABLE_NAMES,
} from '../clickhouse/client';
import type { IServiceProfile } from './profile.service';
import { getProfiles } from './profile.service';
export type IServiceGroup = {
id: string;
@@ -18,26 +27,69 @@ export type IServiceUpsertGroup = {
properties?: Record<string, unknown>;
};
export async function upsertGroup(input: IServiceUpsertGroup) {
const { id, projectId, type, name, properties = {} } = input;
type IClickhouseGroup = {
project_id: string;
id: string;
type: string;
name: string;
properties: Record<string, string>;
created_at: string;
version: string;
};
await db.group.upsert({
where: {
projectId_id: { projectId, id },
},
update: {
type,
name,
properties: properties as Record<string, string>,
updatedAt: new Date(),
},
create: {
id,
projectId,
type,
name,
properties: properties as Record<string, string>,
},
function transformGroup(row: IClickhouseGroup): IServiceGroup {
return {
id: row.id,
projectId: row.project_id,
type: row.type,
name: row.name,
properties: row.properties,
createdAt: new Date(row.created_at),
updatedAt: new Date(Number(row.version)),
};
}
async function writeGroupToCh(
group: {
id: string;
projectId: string;
type: string;
name: string;
properties: Record<string, string>;
createdAt?: Date;
},
deleted = 0
) {
await ch.insert({
format: 'JSONEachRow',
table: TABLE_NAMES.groups,
values: [
{
project_id: group.projectId,
id: group.id,
type: group.type,
name: group.name,
properties: group.properties,
created_at: formatClickhouseDate(group.createdAt ?? new Date()),
version: Date.now(),
deleted,
},
],
});
}
export async function upsertGroup(input: IServiceUpsertGroup) {
const existing = await getGroupById(input.id, input.projectId);
await writeGroupToCh({
id: input.id,
projectId: input.projectId,
type: input.type,
name: input.name,
properties: toDots({
...(existing?.properties ?? {}),
...(input.properties ?? {}),
}),
createdAt: existing?.createdAt,
});
}
@@ -45,23 +97,13 @@ export async function getGroupById(
id: string,
projectId: string
): Promise<IServiceGroup | null> {
const group = await db.group.findUnique({
where: { projectId_id: { projectId, id } },
});
if (!group) {
return null;
}
return {
id: group.id,
projectId: group.projectId,
type: group.type,
name: group.name,
properties: (group.properties as Record<string, unknown>) ?? {},
createdAt: group.createdAt,
updatedAt: group.updatedAt,
};
const rows = await chQuery<IClickhouseGroup>(`
SELECT project_id, id, type, name, properties, created_at, version
FROM ${TABLE_NAMES.groups} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
AND id = ${sqlstring.escape(id)}
`);
return rows[0] ? transformGroup(rows[0]) : null;
}
export async function getGroupList({
@@ -77,33 +119,25 @@ export async function getGroupList({
search?: string;
type?: string;
}): Promise<IServiceGroup[]> {
const groups = await db.group.findMany({
where: {
projectId,
...(type ? { type } : {}),
...(search
? {
OR: [
{ name: { contains: search, mode: 'insensitive' } },
{ id: { contains: search, mode: 'insensitive' } },
],
}
: {}),
},
orderBy: { createdAt: 'desc' },
take,
skip: cursor,
});
const conditions = [
`project_id = ${sqlstring.escape(projectId)}`,
...(type ? [`type = ${sqlstring.escape(type)}`] : []),
...(search
? [
`(name ILIKE ${sqlstring.escape(`%${search}%`)} OR id ILIKE ${sqlstring.escape(`%${search}%`)})`,
]
: []),
];
return groups.map((group) => ({
id: group.id,
projectId: group.projectId,
type: group.type,
name: group.name,
properties: (group.properties as Record<string, unknown>) ?? {},
createdAt: group.createdAt,
updatedAt: group.updatedAt,
}));
const rows = await chQuery<IClickhouseGroup>(`
SELECT project_id, id, type, name, properties, created_at, version
FROM ${TABLE_NAMES.groups} FINAL
WHERE ${conditions.join(' AND ')}
ORDER BY created_at DESC
LIMIT ${take}
OFFSET ${cursor ?? 0}
`);
return rows.map(transformGroup);
}
export async function getGroupListCount({
@@ -115,33 +149,182 @@ export async function getGroupListCount({
type?: string;
search?: string;
}): Promise<number> {
return db.group.count({
where: {
projectId,
...(type ? { type } : {}),
...(search
? {
OR: [
{ name: { contains: search, mode: 'insensitive' } },
{ id: { contains: search, mode: 'insensitive' } },
],
}
: {}),
},
});
const conditions = [
`project_id = ${sqlstring.escape(projectId)}`,
...(type ? [`type = ${sqlstring.escape(type)}`] : []),
...(search
? [
`(name ILIKE ${sqlstring.escape(`%${search}%`)} OR id ILIKE ${sqlstring.escape(`%${search}%`)})`,
]
: []),
];
const rows = await chQuery<{ count: number }>(`
SELECT count() as count
FROM ${TABLE_NAMES.groups} FINAL
WHERE ${conditions.join(' AND ')}
`);
return rows[0]?.count ?? 0;
}
export async function getGroupTypes(projectId: string): Promise<string[]> {
const groups = await db.group.findMany({
where: { projectId },
select: { type: true },
distinct: ['type'],
const rows = await chQuery<{ type: string }>(`
SELECT DISTINCT type
FROM ${TABLE_NAMES.groups} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
`);
return rows.map((r) => r.type);
}
export async function createGroup(input: IServiceUpsertGroup) {
const { id, projectId, type, name, properties = {} } = input;
await writeGroupToCh({
id,
projectId,
type,
name,
properties: properties as Record<string, string>,
createdAt: new Date(),
});
return groups.map((g) => g.type);
return getGroupById(id, projectId);
}
export async function updateGroup(
id: string,
projectId: string,
data: { type?: string; name?: string; properties?: Record<string, unknown> }
) {
const existing = await getGroupById(id, projectId);
if (!existing) {
throw new Error(`Group ${id} not found`);
}
const updated = {
id,
projectId,
type: data.type ?? existing.type,
name: data.name ?? existing.name,
properties: (data.properties ?? existing.properties) as Record<
string,
string
>,
createdAt: existing.createdAt,
};
await writeGroupToCh(updated);
return { ...existing, ...updated };
}
export async function deleteGroup(id: string, projectId: string) {
return db.group.delete({
where: { projectId_id: { projectId, id } },
});
const existing = await getGroupById(id, projectId);
if (!existing) {
throw new Error(`Group ${id} not found`);
}
await writeGroupToCh(
{
id,
projectId,
type: existing.type,
name: existing.name,
properties: existing.properties as Record<string, string>,
createdAt: existing.createdAt,
},
1
);
return existing;
}
export async function getGroupPropertyKeys(
projectId: string
): Promise<string[]> {
const rows = await chQuery<{ key: string }>(`
SELECT DISTINCT arrayJoin(mapKeys(properties)) as key
FROM ${TABLE_NAMES.groups} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
`);
return rows.map((r) => r.key).sort();
}
export async function getGroupsByIds(
projectId: string,
ids: string[]
): Promise<IServiceGroup[]> {
if (ids.length === 0) {
return [];
}
const rows = await chQuery<IClickhouseGroup>(`
SELECT project_id, id, type, name, properties, created_at, version
FROM ${TABLE_NAMES.groups} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
AND id IN (${ids.map((id) => sqlstring.escape(id)).join(',')})
AND deleted = 0
`);
return rows.map(transformGroup);
}
export async function getGroupMemberProfiles({
projectId,
groupId,
cursor,
take,
search,
}: {
projectId: string;
groupId: string;
cursor?: number;
take: number;
search?: string;
}): Promise<{ data: IServiceProfile[]; count: number }> {
const offset = Math.max(0, (cursor ?? 0) * take);
const searchCondition = search?.trim()
? `AND (email ILIKE ${sqlstring.escape(`%${search.trim()}%`)} OR first_name ILIKE ${sqlstring.escape(`%${search.trim()}%`)} OR last_name ILIKE ${sqlstring.escape(`%${search.trim()}%`)})`
: '';
const countResult = await chQuery<{ count: number }>(`
SELECT count() AS count
FROM (
SELECT profile_id
FROM ${TABLE_NAMES.events}
WHERE project_id = ${sqlstring.escape(projectId)}
AND has(groups, ${sqlstring.escape(groupId)})
AND profile_id != device_id
GROUP BY profile_id
) gm
INNER JOIN (
SELECT id FROM ${TABLE_NAMES.profiles} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
${searchCondition}
) p ON p.id = gm.profile_id
`);
const count = countResult[0]?.count ?? 0;
const idRows = await chQuery<{ profile_id: string }>(`
SELECT gm.profile_id
FROM (
SELECT profile_id, max(created_at) AS last_seen
FROM ${TABLE_NAMES.events}
WHERE project_id = ${sqlstring.escape(projectId)}
AND has(groups, ${sqlstring.escape(groupId)})
AND profile_id != device_id
GROUP BY profile_id
ORDER BY last_seen DESC
) gm
INNER JOIN (
SELECT id FROM ${TABLE_NAMES.profiles} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
${searchCondition}
) p ON p.id = gm.profile_id
ORDER BY gm.last_seen DESC
LIMIT ${take}
OFFSET ${offset}
`);
const profileIds = idRows.map((r) => r.profile_id);
if (profileIds.length === 0) {
return { data: [], count };
}
const profiles = await getProfiles(profileIds, projectId);
const byId = new Map(profiles.map((p) => [p.id, p]));
const data = profileIds
.map((id) => byId.get(id))
.filter(Boolean) as IServiceProfile[];
return { data, count };
}

View File

@@ -165,7 +165,8 @@ export async function getProfiles(ids: string[], projectId: string) {
any(nullIf(avatar, '')) as avatar,
last_value(is_external) as is_external,
any(properties) as properties,
any(created_at) as created_at
any(created_at) as created_at,
any(groups) as groups
FROM ${TABLE_NAMES.profiles}
WHERE
project_id = ${sqlstring.escape(projectId)} AND
@@ -232,6 +233,7 @@ export interface IServiceProfile {
createdAt: Date;
isExternal: boolean;
projectId: string;
groups: string[];
properties: Record<string, unknown> & {
region?: string;
country?: string;
@@ -259,6 +261,7 @@ export interface IClickhouseProfile {
project_id: string;
is_external: boolean;
created_at: string;
groups: string[];
}
export interface IServiceUpsertProfile {
@@ -270,6 +273,7 @@ export interface IServiceUpsertProfile {
avatar?: string;
properties?: Record<string, unknown>;
isExternal: boolean;
groups?: string[];
}
export function transformProfile({
@@ -288,6 +292,7 @@ export function transformProfile({
id: profile.id,
email: profile.email,
avatar: profile.avatar,
groups: profile.groups ?? [],
};
}
@@ -301,6 +306,7 @@ export function upsertProfile(
properties,
projectId,
isExternal,
groups,
}: IServiceUpsertProfile,
isFromEvent = false
) {
@@ -314,6 +320,7 @@ export function upsertProfile(
project_id: projectId,
created_at: formatClickhouseDate(new Date()),
is_external: isExternal,
groups: groups ?? [],
};
return profileBuffer.add(profile, isFromEvent);

View File

@@ -55,6 +55,7 @@ export interface IClickhouseSession {
version: number;
// Dynamically added
has_replay?: boolean;
groups: string[];
}
export interface IServiceSession {
@@ -95,6 +96,7 @@ export interface IServiceSession {
revenue: number;
profile?: IServiceProfile;
hasReplay?: boolean;
groups: string[];
}
export interface GetSessionListOptions {
@@ -152,6 +154,7 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
revenue: session.revenue,
profile: undefined,
hasReplay: session.has_replay,
groups: session.groups,
};
}
@@ -244,6 +247,7 @@ export async function getSessionList(options: GetSessionListOptions) {
'screen_view_count',
'event_count',
'revenue',
'groups',
];
columns.forEach((column) => {
@@ -292,6 +296,7 @@ export async function getSessionList(options: GetSessionListOptions) {
projectId,
isExternal: false,
properties: {},
groups: [],
},
}));