wip
This commit is contained in:
@@ -61,9 +61,9 @@ export class ReplayBuffer extends BaseBuffer {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunks = items.map((item) =>
|
||||
getSafeJson<IClickhouseSessionReplayChunk>(item),
|
||||
);
|
||||
const chunks = items
|
||||
.map((item) => getSafeJson<IClickhouseSessionReplayChunk>(item))
|
||||
.filter((item): item is IClickhouseSessionReplayChunk => item != null);
|
||||
|
||||
for (const chunk of this.chunks(chunks, this.chunkSize)) {
|
||||
await ch.insert({
|
||||
|
||||
@@ -2,17 +2,17 @@ import { cacheable } from '@openpanel/redis';
|
||||
import type { IChartEventFilter } from '@openpanel/validation';
|
||||
import sqlstring from 'sqlstring';
|
||||
import {
|
||||
TABLE_NAMES,
|
||||
ch,
|
||||
chQuery,
|
||||
convertClickhouseDateToJs,
|
||||
formatClickhouseDate,
|
||||
TABLE_NAMES,
|
||||
} from '../clickhouse/client';
|
||||
import { clix } from '../clickhouse/query-builder';
|
||||
import { createSqlBuilder } from '../sql-builder';
|
||||
import { getEventFiltersWhereClause } from './chart.service';
|
||||
import { getOrganizationByProjectIdCached } from './organization.service';
|
||||
import { type IServiceProfile, getProfilesCached } from './profile.service';
|
||||
import { getProfilesCached, type IServiceProfile } from './profile.service';
|
||||
|
||||
export type IClickhouseSession = {
|
||||
id: string;
|
||||
@@ -180,8 +180,9 @@ export async function getSessionList({
|
||||
sb.where.range = `created_at BETWEEN toDateTime('${formatClickhouseDate(startDate)}') AND toDateTime('${formatClickhouseDate(endDate)}')`;
|
||||
}
|
||||
|
||||
if (profileId)
|
||||
if (profileId) {
|
||||
sb.where.profileId = `profile_id = ${sqlstring.escape(profileId)}`;
|
||||
}
|
||||
if (search) {
|
||||
const s = sqlstring.escape(`%${search}%`);
|
||||
sb.where.search = `(entry_path ILIKE ${s} OR exit_path ILIKE ${s} OR referrer ILIKE ${s} OR referrer_name ILIKE ${s})`;
|
||||
@@ -237,7 +238,8 @@ export async function getSessionList({
|
||||
sb.select[column] = column;
|
||||
});
|
||||
|
||||
sb.select.has_replay = `exists(SELECT 1 FROM ${TABLE_NAMES.session_replay_chunks} WHERE session_id = id AND project_id = ${sqlstring.escape(projectId)}) as has_replay`;
|
||||
sb.select.has_replay = `toBool(src.session_id != '') as has_replay`;
|
||||
sb.joins.has_replay = `LEFT JOIN (SELECT DISTINCT session_id FROM ${TABLE_NAMES.session_replay_chunks} WHERE project_id = ${sqlstring.escape(projectId)} AND started_at > now() - INTERVAL ${dateIntervalInDays} DAY) AS src ON src.session_id = id`;
|
||||
|
||||
const sql = getSql();
|
||||
const data = await chQuery<
|
||||
@@ -325,40 +327,42 @@ export async function getSessionsCount({
|
||||
|
||||
export const getSessionsCountCached = cacheable(getSessionsCount, 60 * 10);
|
||||
|
||||
export async function getSessionReplayEvents(
|
||||
export interface ISessionReplayChunkMeta {
|
||||
chunk_index: number;
|
||||
started_at: string;
|
||||
ended_at: string;
|
||||
events_count: number;
|
||||
is_full_snapshot: boolean;
|
||||
}
|
||||
|
||||
const REPLAY_CHUNKS_PAGE_SIZE = 40;
|
||||
|
||||
export async function getSessionReplayChunksFrom(
|
||||
sessionId: string,
|
||||
projectId: string,
|
||||
): Promise<{ events: unknown[] }> {
|
||||
const chunks = await clix(ch)
|
||||
.select<{ chunk_index: number; payload: string }>([
|
||||
'chunk_index',
|
||||
'payload',
|
||||
])
|
||||
.from(TABLE_NAMES.session_replay_chunks)
|
||||
.where('session_id', '=', sessionId)
|
||||
.where('project_id', '=', projectId)
|
||||
.orderBy('chunk_index', 'ASC')
|
||||
.execute();
|
||||
|
||||
const allEvents = chunks.flatMap(
|
||||
(chunk) => JSON.parse(chunk.payload) as unknown[],
|
||||
fromIndex: number
|
||||
) {
|
||||
const rows = await chQuery<{ chunk_index: number; payload: string }>(
|
||||
`SELECT chunk_index, payload
|
||||
FROM ${TABLE_NAMES.session_replay_chunks}
|
||||
WHERE session_id = ${sqlstring.escape(sessionId)}
|
||||
AND project_id = ${sqlstring.escape(projectId)}
|
||||
ORDER BY started_at, ended_at
|
||||
LIMIT ${REPLAY_CHUNKS_PAGE_SIZE + 1}
|
||||
OFFSET ${fromIndex}`
|
||||
);
|
||||
|
||||
// rrweb event types: 2 = FullSnapshot, 4 = Meta
|
||||
// Incremental snapshots (type 3) before the first FullSnapshot are orphaned
|
||||
// and cause the player to fast-forward through empty time. Strip them but
|
||||
// keep Meta events (type 4) since rrweb needs them for viewport dimensions.
|
||||
const firstFullSnapshotIdx = allEvents.findIndex((e: any) => e.type === 2);
|
||||
|
||||
let events = allEvents;
|
||||
if (firstFullSnapshotIdx > 0) {
|
||||
const metaEvents = allEvents
|
||||
.slice(0, firstFullSnapshotIdx)
|
||||
.filter((e: any) => e.type === 4);
|
||||
events = [...metaEvents, ...allEvents.slice(firstFullSnapshotIdx)];
|
||||
}
|
||||
|
||||
return { events };
|
||||
return {
|
||||
data: rows.slice(0, REPLAY_CHUNKS_PAGE_SIZE).map((row, index) => ({
|
||||
chunkIndex: index + fromIndex,
|
||||
events: JSON.parse(row.payload) as {
|
||||
type: number;
|
||||
data: unknown;
|
||||
timestamp: number;
|
||||
}[],
|
||||
})),
|
||||
hasMore: rows.length > REPLAY_CHUNKS_PAGE_SIZE,
|
||||
};
|
||||
}
|
||||
|
||||
class SessionService {
|
||||
|
||||
Reference in New Issue
Block a user