fix: realtime improvements

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-20 09:52:29 +01:00
parent d1b39c4c93
commit 88a2d876ce
20 changed files with 2060 additions and 536 deletions

View File

@@ -73,18 +73,20 @@ export class Query<T = any> {
};
private _transform?: Record<string, (item: T) => any>;
private _union?: Query;
private _dateRegex = /\d{4}-\d{2}-\d{2}([\s\:\d\.]+)?/g;
private _dateRegex = /\d{4}-\d{2}-\d{2}([\s:\d.]+)?/g;
constructor(
private client: ClickHouseClient,
private timezone: string,
private timezone: string
) {}
// Select methods
select<U>(
columns: (string | Expression | null | undefined | false)[],
type: 'merge' | 'replace' = 'replace',
type: 'merge' | 'replace' = 'replace'
): Query<U> {
if (this._skipNext) return this as unknown as Query<U>;
if (this._skipNext) {
return this as unknown as Query<U>;
}
if (type === 'merge') {
this._select = [
...this._select,
@@ -92,7 +94,7 @@ export class Query<T = any> {
];
} else {
this._select = columns.filter((col): col is string | Expression =>
Boolean(col),
Boolean(col)
);
}
return this as unknown as Query<U>;
@@ -122,8 +124,12 @@ export class Query<T = any> {
// Where methods
private escapeValue(value: SqlParam): string {
if (value === null) return 'NULL';
if (value instanceof Expression) return `(${value.toString()})`;
if (value === null) {
return 'NULL';
}
if (value instanceof Expression) {
return `(${value.toString()})`;
}
if (Array.isArray(value)) {
return `(${value.map((v) => this.escapeValue(v)).join(', ')})`;
}
@@ -139,7 +145,9 @@ export class Query<T = any> {
}
where(column: string, operator: Operator, value?: SqlParam): this {
if (this._skipNext) return this;
if (this._skipNext) {
return this;
}
const condition = this.buildCondition(column, operator, value);
this._where.push({ condition, operator: 'AND' });
return this;
@@ -148,7 +156,7 @@ export class Query<T = any> {
public buildCondition(
column: string,
operator: Operator,
value?: SqlParam,
value?: SqlParam
): string {
switch (operator) {
case 'IS NULL':
@@ -162,7 +170,7 @@ export class Query<T = any> {
throw new Error('BETWEEN operator requires an array of two values');
case 'IN':
case 'NOT IN':
if (!Array.isArray(value) && !(value instanceof Expression)) {
if (!(Array.isArray(value) || value instanceof Expression)) {
throw new Error(`${operator} operator requires an array value`);
}
return `${column} ${operator} ${this.escapeValue(value)}`;
@@ -224,7 +232,9 @@ export class Query<T = any> {
// Order by methods
orderBy(column: string, direction: 'ASC' | 'DESC' = 'ASC'): this {
if (this._skipNext) return this;
if (this._skipNext) {
return this;
}
this._orderBy.push({ column, direction });
return this;
}
@@ -259,7 +269,7 @@ export class Query<T = any> {
fill(
from: string | Date | Expression,
to: string | Date | Expression,
step: string | Expression,
step: string | Expression
): this {
this._fill = {
from:
@@ -288,7 +298,7 @@ export class Query<T = any> {
innerJoin(
table: string | Expression,
condition: string,
alias?: string,
alias?: string
): this {
return this.joinWithType('INNER', table, condition, alias);
}
@@ -296,7 +306,7 @@ export class Query<T = any> {
leftJoin(
table: string | Expression | Query,
condition: string,
alias?: string,
alias?: string
): this {
return this.joinWithType('LEFT', table, condition, alias);
}
@@ -304,7 +314,7 @@ export class Query<T = any> {
leftAnyJoin(
table: string | Expression | Query,
condition: string,
alias?: string,
alias?: string
): this {
return this.joinWithType('LEFT ANY', table, condition, alias);
}
@@ -312,7 +322,7 @@ export class Query<T = any> {
rightJoin(
table: string | Expression,
condition: string,
alias?: string,
alias?: string
): this {
return this.joinWithType('RIGHT', table, condition, alias);
}
@@ -320,7 +330,7 @@ export class Query<T = any> {
fullJoin(
table: string | Expression,
condition: string,
alias?: string,
alias?: string
): this {
return this.joinWithType('FULL', table, condition, alias);
}
@@ -333,9 +343,11 @@ export class Query<T = any> {
type: JoinType,
table: string | Expression | Query,
condition: string,
alias?: string,
alias?: string
): this {
if (this._skipNext) return this;
if (this._skipNext) {
return this;
}
this._joins.push({
type,
table,
@@ -386,9 +398,9 @@ export class Query<T = any> {
// on them, otherwise any embedded date strings get double-escaped
// (e.g. ''2025-12-16 23:59:59'') which ClickHouse rejects.
.map((col) =>
col instanceof Expression ? col.toString() : this.escapeDate(col),
col instanceof Expression ? col.toString() : this.escapeDate(col)
)
.join(', '),
.join(', ')
);
} else {
parts.push('SELECT *');
@@ -411,7 +423,7 @@ export class Query<T = any> {
const aliasClause = join.alias ? ` ${join.alias} ` : ' ';
const conditionStr = join.condition ? `ON ${join.condition}` : '';
parts.push(
`${join.type} JOIN ${join.table instanceof Query ? `(${join.table.toSQL()})` : join.table instanceof Expression ? `(${join.table.toString()})` : join.table}${aliasClause}${conditionStr}`,
`${join.type} JOIN ${join.table instanceof Query ? `(${join.table.toSQL()})` : join.table instanceof Expression ? `(${join.table.toString()})` : join.table}${aliasClause}${conditionStr}`
);
});
}
@@ -524,10 +536,10 @@ export class Query<T = any> {
// Execution methods
async execute(): Promise<T[]> {
const query = this.buildQuery();
console.log(
'query',
`${query.replaceAll('\n', ' ').replaceAll('\t', ' ').replaceAll('\r', ' ')} SETTINGS session_timezone = '${this.timezone}'`,
);
// console.log(
// 'query',
// `${query.replaceAll('\n', ' ').replaceAll('\t', ' ').replaceAll('\r', ' ')} SETTINGS session_timezone = '${this.timezone}'`,
// );
const result = await this.client.query({
query,
@@ -574,7 +586,9 @@ export class Query<T = any> {
// Add merge method
merge(query: Query): this {
if (this._skipNext) return this;
if (this._skipNext) {
return this;
}
this._from = query._from;
@@ -621,7 +635,7 @@ export class WhereGroupBuilder {
constructor(
private query: Query,
private groupOperator: 'AND' | 'OR',
private groupOperator: 'AND' | 'OR'
) {}
where(column: string, operator: Operator, value?: SqlParam): this {
@@ -706,7 +720,7 @@ clix.toStartOf = (node: string, interval: IInterval, timezone?: string) => {
clix.toStartOfInterval = (
node: string,
interval: IInterval,
origin: string | Date,
origin: string | Date
) => {
switch (interval) {
case 'minute': {

View File

@@ -2,7 +2,9 @@ import {
ch,
chQuery,
clix,
convertClickhouseDateToJs,
formatClickhouseDate,
getProfiles,
type IClickhouseEvent,
TABLE_NAMES,
transformEvent,
@@ -12,6 +14,98 @@ import sqlstring from 'sqlstring';
import { z } from 'zod';
import { createTRPCRouter, protectedProcedure } from '../trpc';
const realtimeLocationSchema = z.object({
country: z.string().optional(),
city: z.string().optional(),
lat: z.number().optional(),
long: z.number().optional(),
});
const realtimeBadgeDetailScopeSchema = z.enum([
'country',
'city',
'coordinate',
'merged',
]);
function buildRealtimeLocationFilter(
locations: z.infer<typeof realtimeLocationSchema>[]
) {
const tuples = locations
.filter(
(
location
): location is z.infer<typeof realtimeLocationSchema> & {
lat: number;
long: number;
} => typeof location.lat === 'number' && typeof location.long === 'number'
)
.map(
(location) =>
`(${sqlstring.escape(location.country ?? '')}, ${sqlstring.escape(
location.city ?? ''
)}, toDecimal64(${location.long.toFixed(4)}, 4), toDecimal64(${location.lat.toFixed(4)}, 4))`
);
if (tuples.length === 0) {
return buildRealtimeCityFilter(locations);
}
return `(coalesce(country, ''), coalesce(city, ''), toDecimal64(longitude, 4), toDecimal64(latitude, 4)) IN (${tuples.join(', ')})`;
}
function buildRealtimeCountryFilter(
locations: z.infer<typeof realtimeLocationSchema>[]
) {
const countries = [
...new Set(locations.map((location) => location.country ?? '')),
];
return `coalesce(country, '') IN (${countries
.map((country) => sqlstring.escape(country))
.join(', ')})`;
}
function buildRealtimeCityFilter(
locations: z.infer<typeof realtimeLocationSchema>[]
) {
const tuples = [
...new Set(
locations.map(
(location) =>
`(${sqlstring.escape(location.country ?? '')}, ${sqlstring.escape(
location.city ?? ''
)})`
)
),
];
if (tuples.length === 0) {
return buildRealtimeCountryFilter(locations);
}
return `(coalesce(country, ''), coalesce(city, '')) IN (${tuples.join(', ')})`;
}
function buildRealtimeBadgeDetailsFilter(input: {
detailScope: z.infer<typeof realtimeBadgeDetailScopeSchema>;
locations: z.infer<typeof realtimeLocationSchema>[];
}) {
if (input.detailScope === 'country') {
return buildRealtimeCountryFilter(input.locations);
}
if (input.detailScope === 'city') {
return buildRealtimeCityFilter(input.locations);
}
if (input.detailScope === 'merged') {
return buildRealtimeCityFilter(input.locations);
}
return buildRealtimeLocationFilter(input.locations);
}
export const realtimeRouter = createTRPCRouter({
coordinates: protectedProcedure
.input(z.object({ projectId: z.string() }))
@@ -21,12 +115,195 @@ export const realtimeRouter = createTRPCRouter({
country: string;
long: number;
lat: number;
count: number;
}>(
`SELECT DISTINCT country, city, longitude as long, latitude as lat FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(input.projectId)} AND created_at >= '${formatClickhouseDate(subMinutes(new Date(), 30))}' ORDER BY created_at DESC`
`SELECT
country,
city,
longitude as long,
latitude as lat,
COUNT(DISTINCT session_id) as count
FROM ${TABLE_NAMES.events}
WHERE project_id = ${sqlstring.escape(input.projectId)}
AND created_at >= now() - INTERVAL 30 MINUTE
AND longitude IS NOT NULL
AND latitude IS NOT NULL
GROUP BY country, city, longitude, latitude
ORDER BY count DESC`
);
res.forEach((item) => {
console.log(item.country, item.city, item.long, item.lat);
});
return res;
}),
mapBadgeDetails: protectedProcedure
.input(
z.object({
detailScope: realtimeBadgeDetailScopeSchema,
projectId: z.string(),
locations: z.array(realtimeLocationSchema).min(1).max(200),
})
)
.query(async ({ input }) => {
const since = formatClickhouseDate(subMinutes(new Date(), 30));
const locationFilter = buildRealtimeBadgeDetailsFilter(input);
const summaryQuery = clix(ch)
.select<{
total_sessions: number;
total_profiles: number;
}>([
'COUNT(DISTINCT session_id) as total_sessions',
"COUNT(DISTINCT nullIf(profile_id, '')) as total_profiles",
])
.from(TABLE_NAMES.events)
.where('project_id', '=', input.projectId)
.where('created_at', '>=', since)
.rawWhere(locationFilter);
const topReferrersQuery = clix(ch)
.select<{
referrer_name: string;
count: number;
}>(['referrer_name', 'COUNT(DISTINCT session_id) as count'])
.from(TABLE_NAMES.events)
.where('project_id', '=', input.projectId)
.where('created_at', '>=', since)
.where('referrer_name', '!=', '')
.rawWhere(locationFilter)
.groupBy(['referrer_name'])
.orderBy('count', 'DESC')
.limit(3);
const topPathsQuery = clix(ch)
.select<{
origin: string;
path: string;
count: number;
}>(['origin', 'path', 'COUNT(DISTINCT session_id) as count'])
.from(TABLE_NAMES.events)
.where('project_id', '=', input.projectId)
.where('created_at', '>=', since)
.where('path', '!=', '')
.rawWhere(locationFilter)
.groupBy(['origin', 'path'])
.orderBy('count', 'DESC')
.limit(3);
const topEventsQuery = clix(ch)
.select<{
name: string;
count: number;
}>(['name', 'COUNT(DISTINCT session_id) as count'])
.from(TABLE_NAMES.events)
.where('project_id', '=', input.projectId)
.where('created_at', '>=', since)
.where('name', 'NOT IN', [
'screen_view',
'session_start',
'session_end',
])
.rawWhere(locationFilter)
.groupBy(['name'])
.orderBy('count', 'DESC')
.limit(3);
const [summary, topReferrers, topPaths, topEvents, recentSessions] =
await Promise.all([
summaryQuery.execute(),
topReferrersQuery.execute(),
topPathsQuery.execute(),
topEventsQuery.execute(),
chQuery<{
profile_id: string;
session_id: string;
created_at: string;
path: string;
name: string;
country: string;
city: string;
}>(
`SELECT
session_id,
profile_id,
created_at,
path,
name,
country,
city
FROM (
SELECT
session_id,
profile_id,
created_at,
path,
name,
country,
city,
row_number() OVER (
PARTITION BY session_id ORDER BY created_at DESC
) AS rn
FROM ${TABLE_NAMES.events}
WHERE project_id = ${sqlstring.escape(input.projectId)}
AND created_at >= ${sqlstring.escape(since)}
AND (${locationFilter})
) AS latest_event_per_session
WHERE rn = 1
ORDER BY created_at DESC
LIMIT 8`
),
]);
const profiles = await getProfiles(
recentSessions.map((item) => item.profile_id).filter(Boolean),
input.projectId
);
const profileMap = new Map(
profiles.map((profile) => [profile.id, profile])
);
return {
summary: {
totalSessions: summary[0]?.total_sessions ?? 0,
totalProfiles: summary[0]?.total_profiles ?? 0,
totalLocations: input.locations.length,
totalCountries: new Set(
input.locations.map((location) => location.country).filter(Boolean)
).size,
totalCities: new Set(
input.locations.map((location) => location.city).filter(Boolean)
).size,
},
topReferrers: topReferrers.map((item) => ({
referrerName: item.referrer_name,
count: item.count,
})),
topPaths,
topEvents,
recentProfiles: recentSessions.map((item) => {
const profile = profileMap.get(item.profile_id);
return {
id: item.profile_id || item.session_id,
profileId:
item.profile_id && item.profile_id !== ''
? item.profile_id
: null,
sessionId: item.session_id,
createdAt: convertClickhouseDateToJs(item.created_at),
latestPath: item.path,
latestEvent: item.name,
city: profile?.properties.city || item.city,
country: profile?.properties.country || item.country,
firstName: profile?.firstName ?? '',
lastName: profile?.lastName ?? '',
email: profile?.email ?? '',
avatar: profile?.avatar ?? '',
};
}),
};
}),
activeSessions: protectedProcedure
.input(z.object({ projectId: z.string() }))
.query(async ({ input }) => {
@@ -70,7 +347,7 @@ export const realtimeRouter = createTRPCRouter({
)
.groupBy(['path', 'origin'])
.orderBy('count', 'DESC')
.limit(100)
.limit(50)
.execute();
return res;
@@ -100,7 +377,7 @@ export const realtimeRouter = createTRPCRouter({
)
.groupBy(['referrer_name'])
.orderBy('count', 'DESC')
.limit(100)
.limit(50)
.execute();
return res;
@@ -131,7 +408,7 @@ export const realtimeRouter = createTRPCRouter({
)
.groupBy(['country', 'city'])
.orderBy('count', 'DESC')
.limit(100)
.limit(50)
.execute();
return res;