final fixes

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-26 10:19:29 +01:00
parent b193ccb7d0
commit d5513d8a47
21 changed files with 388 additions and 370 deletions

View File

@@ -1,8 +1,7 @@
import { type Redis, getRedisCache } from '@openpanel/redis';
import { getSafeJson } from '@openpanel/json';
import { getRedisCache, type Redis } from '@openpanel/redis';
import { assocPath, clone } from 'ramda';
import { TABLE_NAMES, ch } from '../clickhouse/client';
import { ch, TABLE_NAMES } from '../clickhouse/client';
import type { IClickhouseEvent } from '../services/event.service';
import type { IClickhouseSession } from '../services/session.service';
import { BaseBuffer } from './base-buffer';
@@ -35,14 +34,14 @@ export class SessionBuffer extends BaseBuffer {
| {
projectId: string;
profileId: string;
},
}
) {
let hit: string | null = null;
if ('sessionId' in options) {
hit = await this.redis.get(`session:${options.sessionId}`);
} else {
hit = await this.redis.get(
`session:${options.projectId}:${options.profileId}`,
`session:${options.projectId}:${options.profileId}`
);
}
@@ -54,7 +53,7 @@ export class SessionBuffer extends BaseBuffer {
}
async getSession(
event: IClickhouseEvent,
event: IClickhouseEvent
): Promise<[IClickhouseSession] | [IClickhouseSession, IClickhouseSession]> {
const existingSession = await this.getExistingSession({
sessionId: event.session_id,
@@ -186,14 +185,14 @@ export class SessionBuffer extends BaseBuffer {
`session:${newSession.id}`,
JSON.stringify(newSession),
'EX',
60 * 60,
60 * 60
);
if (newSession.profile_id) {
multi.set(
`session:${newSession.project_id}:${newSession.profile_id}`,
JSON.stringify(newSession),
'EX',
60 * 60,
60 * 60
);
}
for (const session of sessions) {
@@ -220,10 +219,12 @@ export class SessionBuffer extends BaseBuffer {
const events = await this.redis.lrange(
this.redisKey,
0,
this.batchSize - 1,
this.batchSize - 1
);
if (events.length === 0) return;
if (events.length === 0) {
return;
}
const sessions = events
.map((e) => getSafeJson<IClickhouseSession>(e))
@@ -258,7 +259,7 @@ export class SessionBuffer extends BaseBuffer {
}
}
async getBufferSize() {
getBufferSize() {
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
}
}

View File

@@ -1,3 +1,4 @@
import { getSafeJson } from '@openpanel/json';
import { cacheable } from '@openpanel/redis';
import type { IChartEventFilter } from '@openpanel/validation';
import sqlstring from 'sqlstring';
@@ -14,7 +15,7 @@ import { getEventFiltersWhereClause } from './chart.service';
import { getOrganizationByProjectIdCached } from './organization.service';
import { getProfilesCached, type IServiceProfile } from './profile.service';
export type IClickhouseSession = {
export interface IClickhouseSession {
id: string;
profile_id: string;
event_count: number;
@@ -53,8 +54,9 @@ export type IClickhouseSession = {
revenue: number;
sign: 1 | 0;
version: number;
has_replay: boolean;
};
// Dynamically added
has_replay?: boolean;
}
export interface IServiceSession {
id: string;
@@ -92,8 +94,8 @@ export interface IServiceSession {
utmContent: string;
utmTerm: string;
revenue: number;
hasReplay: boolean;
profile?: IServiceProfile;
hasReplay?: boolean;
}
export interface GetSessionListOptions {
@@ -144,21 +146,19 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
utmContent: session.utm_content,
utmTerm: session.utm_term,
revenue: session.revenue,
hasReplay: session.has_replay,
profile: undefined,
hasReplay: session.has_replay,
};
}
type Direction = 'initial' | 'next' | 'prev';
type PageInfo = {
interface PageInfo {
next?: Cursor; // use last row
};
}
type Cursor = {
interface Cursor {
createdAt: string; // ISO 8601 with ms
id: string;
};
}
export async function getSessionList({
cursor,
@@ -238,13 +238,14 @@ export async function getSessionList({
sb.select[column] = column;
});
sb.select.has_replay = `toBool(src.session_id != '') as has_replay`;
sb.select.has_replay = `toBool(src.session_id != '') as hasReplay`;
sb.joins.has_replay = `LEFT JOIN (SELECT DISTINCT session_id FROM ${TABLE_NAMES.session_replay_chunks} WHERE project_id = ${sqlstring.escape(projectId)} AND started_at > now() - INTERVAL ${dateIntervalInDays} DAY) AS src ON src.session_id = id`;
const sql = getSql();
const data = await chQuery<
IClickhouseSession & {
latestCreatedAt: string;
hasReplay: boolean;
}
>(sql);
@@ -347,20 +348,24 @@ export async function getSessionReplayChunksFrom(
FROM ${TABLE_NAMES.session_replay_chunks}
WHERE session_id = ${sqlstring.escape(sessionId)}
AND project_id = ${sqlstring.escape(projectId)}
ORDER BY started_at, ended_at
ORDER BY started_at, ended_at, chunk_index
LIMIT ${REPLAY_CHUNKS_PAGE_SIZE + 1}
OFFSET ${fromIndex}`
);
return {
data: rows.slice(0, REPLAY_CHUNKS_PAGE_SIZE).map((row, index) => ({
chunkIndex: index + fromIndex,
events: JSON.parse(row.payload) as {
type: number;
data: unknown;
timestamp: number;
}[],
})),
data: rows
.slice(0, REPLAY_CHUNKS_PAGE_SIZE)
.map((row, index) => {
const events = getSafeJson<
{ type: number; data: unknown; timestamp: number }[]
>(row.payload);
if (!events) {
return null;
}
return { chunkIndex: index + fromIndex, events };
})
.filter(Boolean),
hasMore: rows.length > REPLAY_CHUNKS_PAGE_SIZE,
};
}
@@ -369,19 +374,33 @@ class SessionService {
constructor(private client: typeof ch) {}
async byId(sessionId: string, projectId: string) {
const result = await clix(this.client)
.select<IClickhouseSession>(['*'])
.from(TABLE_NAMES.sessions)
.where('id', '=', sessionId)
.where('project_id', '=', projectId)
.where('sign', '=', 1)
.execute();
const [sessionRows, hasReplayRows] = await Promise.all([
clix(this.client)
.select<IClickhouseSession>(['*'])
.from(TABLE_NAMES.sessions, true)
.where('id', '=', sessionId)
.where('project_id', '=', projectId)
.where('sign', '=', 1)
.execute(),
chQuery<{ n: number }>(
`SELECT 1 AS n
FROM ${TABLE_NAMES.session_replay_chunks}
WHERE session_id = ${sqlstring.escape(sessionId)}
AND project_id = ${sqlstring.escape(projectId)}
LIMIT 1`
),
]);
if (!result[0]) {
if (!sessionRows[0]) {
throw new Error('Session not found');
}
return transformSession(result[0]);
const session = transformSession(sessionRows[0]);
return {
...session,
hasReplay: hasReplayRows.length > 0,
};
}
}