wip
This commit is contained in:
@@ -2,6 +2,7 @@ import { BotBuffer as BotBufferRedis } from './bot-buffer';
|
||||
import { EventBuffer as EventBufferRedis } from './event-buffer';
|
||||
import { ProfileBackfillBuffer } from './profile-backfill-buffer';
|
||||
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer';
|
||||
import { ReplayBuffer } from './replay-buffer';
|
||||
import { SessionBuffer } from './session-buffer';
|
||||
|
||||
export const eventBuffer = new EventBufferRedis();
|
||||
@@ -9,5 +10,7 @@ export const profileBuffer = new ProfileBufferRedis();
|
||||
export const botBuffer = new BotBufferRedis();
|
||||
export const sessionBuffer = new SessionBuffer();
|
||||
export const profileBackfillBuffer = new ProfileBackfillBuffer();
|
||||
export const replayBuffer = new ReplayBuffer();
|
||||
|
||||
export type { ProfileBackfillEntry } from './profile-backfill-buffer';
|
||||
export type { IClickhouseSessionReplayChunk } from './replay-buffer';
|
||||
|
||||
92
packages/db/src/buffers/replay-buffer.ts
Normal file
92
packages/db/src/buffers/replay-buffer.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import { TABLE_NAMES, ch } from '../clickhouse/client';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export interface IClickhouseSessionReplayChunk {
|
||||
project_id: string;
|
||||
session_id: string;
|
||||
chunk_index: number;
|
||||
started_at: string;
|
||||
ended_at: string;
|
||||
events_count: number;
|
||||
is_full_snapshot: boolean;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export class ReplayBuffer extends BaseBuffer {
|
||||
private batchSize = process.env.REPLAY_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.REPLAY_BUFFER_BATCH_SIZE, 10)
|
||||
: 500;
|
||||
private chunkSize = process.env.REPLAY_BUFFER_CHUNK_SIZE
|
||||
? Number.parseInt(process.env.REPLAY_BUFFER_CHUNK_SIZE, 10)
|
||||
: 500;
|
||||
|
||||
private readonly redisKey = 'replay-buffer';
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'replay',
|
||||
onFlush: async () => {
|
||||
await this.processBuffer();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async add(chunk: IClickhouseSessionReplayChunk) {
|
||||
try {
|
||||
const redis = getRedisCache();
|
||||
const result = await redis
|
||||
.multi()
|
||||
.rpush(this.redisKey, JSON.stringify(chunk))
|
||||
.incr(this.bufferCounterKey)
|
||||
.llen(this.redisKey)
|
||||
.exec();
|
||||
|
||||
const bufferLength = (result?.[2]?.[1] as number) ?? 0;
|
||||
if (bufferLength >= this.batchSize) {
|
||||
await this.tryFlush();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add replay chunk to buffer', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async processBuffer() {
|
||||
const redis = getRedisCache();
|
||||
try {
|
||||
const items = await redis.lrange(this.redisKey, 0, this.batchSize - 1);
|
||||
|
||||
if (items.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunks = items.map((item) =>
|
||||
getSafeJson<IClickhouseSessionReplayChunk>(item),
|
||||
);
|
||||
|
||||
for (const chunk of this.chunks(chunks, this.chunkSize)) {
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.session_replay_chunks,
|
||||
values: chunk,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
}
|
||||
|
||||
await redis
|
||||
.multi()
|
||||
.ltrim(this.redisKey, items.length, -1)
|
||||
.decrby(this.bufferCounterKey, items.length)
|
||||
.exec();
|
||||
|
||||
this.logger.debug('Processed replay chunks', { count: items.length });
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to process replay buffer', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
const redis = getRedisCache();
|
||||
return this.getBufferSizeWithCounter(() => redis.llen(this.redisKey));
|
||||
}
|
||||
}
|
||||
@@ -163,46 +163,10 @@ export class SessionBuffer extends BaseBuffer {
|
||||
: '',
|
||||
sign: 1,
|
||||
version: 1,
|
||||
has_replay: false,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
async markHasReplay(sessionId: string): Promise<void> {
|
||||
console.log('markHasReplay', sessionId);
|
||||
const existingSession = await this.getExistingSession({ sessionId });
|
||||
if (!existingSession) {
|
||||
console.log('no existing session or has replay', existingSession);
|
||||
return;
|
||||
}
|
||||
|
||||
if (existingSession.has_replay) {
|
||||
return;
|
||||
}
|
||||
|
||||
const oldSession = assocPath(['sign'], -1, clone(existingSession));
|
||||
const newSession = assocPath(['sign'], 1, clone(existingSession));
|
||||
newSession.version = existingSession.version + 1;
|
||||
newSession.has_replay = true;
|
||||
|
||||
const multi = this.redis.multi();
|
||||
multi.set(
|
||||
`session:${sessionId}`,
|
||||
JSON.stringify(newSession),
|
||||
'EX',
|
||||
60 * 60,
|
||||
);
|
||||
multi.rpush(this.redisKey, JSON.stringify(newSession));
|
||||
multi.rpush(this.redisKey, JSON.stringify(oldSession));
|
||||
multi.incrby(this.bufferCounterKey, 2);
|
||||
await multi.exec();
|
||||
|
||||
const bufferLength = await this.getBufferSize();
|
||||
if (bufferLength >= this.batchSize) {
|
||||
await this.tryFlush();
|
||||
}
|
||||
}
|
||||
|
||||
async add(event: IClickhouseEvent) {
|
||||
if (!event.session_id) {
|
||||
return;
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
TABLE_NAMES,
|
||||
ch,
|
||||
chQuery,
|
||||
convertClickhouseDateToJs,
|
||||
formatClickhouseDate,
|
||||
} from '../clickhouse/client';
|
||||
import { clix } from '../clickhouse/query-builder';
|
||||
@@ -52,7 +53,7 @@ export type IClickhouseSession = {
|
||||
revenue: number;
|
||||
sign: 1 | 0;
|
||||
version: number;
|
||||
has_replay?: boolean;
|
||||
has_replay: boolean;
|
||||
};
|
||||
|
||||
export interface IServiceSession {
|
||||
@@ -116,8 +117,8 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
|
||||
entryOrigin: session.entry_origin,
|
||||
exitPath: session.exit_path,
|
||||
exitOrigin: session.exit_origin,
|
||||
createdAt: new Date(session.created_at),
|
||||
endedAt: new Date(session.ended_at),
|
||||
createdAt: convertClickhouseDateToJs(session.created_at),
|
||||
endedAt: convertClickhouseDateToJs(session.ended_at),
|
||||
referrer: session.referrer,
|
||||
referrerName: session.referrer_name,
|
||||
referrerType: session.referrer_type,
|
||||
@@ -143,7 +144,7 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
|
||||
utmContent: session.utm_content,
|
||||
utmTerm: session.utm_term,
|
||||
revenue: session.revenue,
|
||||
hasReplay: session.has_replay ?? false,
|
||||
hasReplay: session.has_replay,
|
||||
profile: undefined,
|
||||
};
|
||||
}
|
||||
@@ -230,13 +231,14 @@ export async function getSessionList({
|
||||
'screen_view_count',
|
||||
'event_count',
|
||||
'revenue',
|
||||
'has_replay',
|
||||
];
|
||||
|
||||
columns.forEach((column) => {
|
||||
sb.select[column] = column;
|
||||
});
|
||||
|
||||
sb.select.has_replay = `exists(SELECT 1 FROM ${TABLE_NAMES.session_replay_chunks} WHERE session_id = id AND project_id = ${sqlstring.escape(projectId)}) as has_replay`;
|
||||
|
||||
const sql = getSql();
|
||||
const data = await chQuery<
|
||||
IClickhouseSession & {
|
||||
|
||||
Reference in New Issue
Block a user