From 551927af06cec19a894cdf8718d39cef5eab1ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 9 Feb 2026 09:54:22 +0000 Subject: [PATCH] wip --- apps/api/src/controllers/event.controller.ts | 1 + apps/api/src/controllers/track.controller.ts | 245 +++++++++++++----- apps/api/src/routes/track.router.ts | 1 + apps/start/package.json | 1 + .../sessions/replay/browser-chrome.tsx | 42 +++ .../src/components/sessions/replay/index.tsx | 139 ++++++++++ .../sessions/replay/replay-context.tsx | 210 +++++++++++++++ .../sessions/replay/replay-controls.tsx | 69 +++++ .../sessions/replay/replay-event-feed.tsx | 95 +++++++ .../sessions/replay/replay-event-item.tsx | 56 ++++ .../sessions/replay/replay-player.tsx | 99 +++++++ .../sessions/replay/replay-timeline.tsx | 234 +++++++++++++++++ .../src/components/sessions/table/columns.tsx | 27 +- ...tionId.$projectId.sessions_.$sessionId.tsx | 4 + apps/start/src/types/rrweb-player.d.ts | 43 +++ apps/worker/src/jobs/events.incoming-event.ts | 23 +- apps/worker/src/utils/session-handler.ts | 37 ++- packages/common/src/id.ts | 4 +- .../code-migrations/10-add-session-replay.ts | 65 +++++ packages/db/code-migrations/migrate.ts | 19 +- packages/db/src/buffers/session-buffer.ts | 36 +++ packages/db/src/clickhouse/client.ts | 1 + packages/db/src/services/session.service.ts | 39 +++ packages/queue/src/queues.ts | 6 +- packages/sdks/sdk/src/index.ts | 79 ++++-- packages/sdks/web/package.json | 2 + packages/sdks/web/src/index.ts | 95 +++++++ packages/sdks/web/src/replay/index.ts | 2 + packages/sdks/web/src/replay/recorder.ts | 131 ++++++++++ packages/sdks/web/tsup.config.ts | 54 +++- packages/trpc/src/routers/session.ts | 12 +- packages/validation/src/track.validation.ts | 14 + tooling/publish/publish.ts | 3 + 33 files changed, 1746 insertions(+), 142 deletions(-) create mode 100644 apps/start/src/components/sessions/replay/browser-chrome.tsx create mode 100644 apps/start/src/components/sessions/replay/index.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-context.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-controls.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-event-feed.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-event-item.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-player.tsx create mode 100644 apps/start/src/components/sessions/replay/replay-timeline.tsx create mode 100644 apps/start/src/types/rrweb-player.d.ts create mode 100644 packages/db/code-migrations/10-add-session-replay.ts create mode 100644 packages/sdks/web/src/replay/index.ts create mode 100644 packages/sdks/web/src/replay/recorder.ts diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index ba67f119..28d060d8 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -76,6 +76,7 @@ export async function postEvent( geo, currentDeviceId, previousDeviceId, + deviceId: '', }, groupId, jobId, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 2f6fbe7c..1629c591 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -3,8 +3,19 @@ import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; import { generateId, slug } from '@openpanel/common'; -import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; -import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; +import { + generateDeviceId, + generateSecureId, + parseUserAgent, +} from '@openpanel/common/server'; +import { + TABLE_NAMES, + ch, + getProfileById, + getSalts, + sessionBuffer, + upsertProfile, +} from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; import { getEventsGroupQueueShard } from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; @@ -13,11 +24,90 @@ import { type IDecrementPayload, type IIdentifyPayload, type IIncrementPayload, + type IReplayPayload, type ITrackHandlerPayload, type ITrackPayload, zTrackHandlerPayload, } from '@openpanel/validation'; +async function getDeviceId({ + projectId, + ip, + ua, + salts, + overrideDeviceId, +}: { + projectId: string; + ip: string; + ua: string | undefined; + salts: { current: string; previous: string }; + overrideDeviceId?: string; +}) { + if (overrideDeviceId) { + return { deviceId: overrideDeviceId, sessionId: undefined }; + } + + if (!ua) { + return { deviceId: '', sessionId: undefined }; + } + + const currentDeviceId = generateDeviceId({ + salt: salts.current, + origin: projectId, + ip, + ua, + }); + const previousDeviceId = generateDeviceId({ + salt: salts.previous, + origin: projectId, + ip, + ua, + }); + + return await getDeviceIdFromSession({ + projectId, + currentDeviceId, + previousDeviceId, + }); +} + +async function getDeviceIdFromSession({ + projectId, + currentDeviceId, + previousDeviceId, +}: { + projectId: string; + currentDeviceId: string; + previousDeviceId: string; +}) { + try { + const multi = getRedisCache().multi(); + multi.hget( + `bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`, + 'data', + ); + multi.hget( + `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`, + 'data', + ); + const res = await multi.exec(); + if (res?.[0]?.[1]) { + const data = JSON.parse(res?.[0]?.[1] as string); + const sessionId = data.payload.sessionId; + return { deviceId: currentDeviceId, sessionId }; + } + if (res?.[1]?.[1]) { + const data = JSON.parse(res?.[1]?.[1] as string); + const sessionId = data.payload.sessionId; + return { deviceId: previousDeviceId, sessionId }; + } + } catch (error) { + console.error('Error getting session end GET /track/device-id', error); + } + + return { deviceId: currentDeviceId, sessionId: generateSecureId('se') }; +} + export function getStringHeaders(headers: FastifyRequest['headers']) { return Object.entries( pick( @@ -45,14 +135,15 @@ function getIdentity(body: ITrackHandlerPayload): IIdentifyPayload | undefined { | IIdentifyPayload | undefined; - return ( - identity || - (body.payload.profileId - ? { - profileId: String(body.payload.profileId), - } - : undefined) - ); + if (identity) { + return identity; + } + + return body.payload.profileId + ? { + profileId: String(body.payload.profileId), + } + : undefined; } return undefined; @@ -104,8 +195,8 @@ interface TrackContext { headers: Record; timestamp: { value: number; isFromPast: boolean }; identity?: IIdentifyPayload; - currentDeviceId?: string; - previousDeviceId?: string; + deviceId: string; + sessionId: string; geo: GeoLocation; } @@ -128,49 +219,27 @@ async function buildContext( const ua = request.headers['user-agent'] ?? 'unknown/1.0'; const headers = getStringHeaders(request.headers); - const identity = getIdentity(validatedBody); const profileId = identity?.profileId; - // We might get a profileId from the alias table - // If we do, we should use that instead of the one from the payload if (profileId && validatedBody.type === 'track') { validatedBody.payload.profileId = profileId; } // Get geo location (needed for track and identify) - const geo = await getGeoLocation(ip); + const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - // Generate device IDs if needed (for track) - let currentDeviceId: string | undefined; - let previousDeviceId: string | undefined; - - if (validatedBody.type === 'track') { - const overrideDeviceId = - typeof validatedBody.payload.properties?.__deviceId === 'string' - ? validatedBody.payload.properties.__deviceId - : undefined; - - const salts = await getSalts(); - currentDeviceId = - overrideDeviceId || - (ua - ? generateDeviceId({ - salt: salts.current, - origin: projectId, - ip, - ua, - }) - : ''); - previousDeviceId = ua - ? generateDeviceId({ - salt: salts.previous, - origin: projectId, - ip, - ua, - }) - : ''; - } + const { deviceId, sessionId } = await getDeviceId({ + projectId, + ip, + ua, + salts, + overrideDeviceId: + validatedBody.type === 'track' && + typeof validatedBody.payload?.properties?.__deviceId === 'string' + ? validatedBody.payload?.properties.__deviceId + : undefined, + }); return { projectId, @@ -182,8 +251,8 @@ async function buildContext( isFromPast: timestamp.isTimestampFromThePast, }, identity, - currentDeviceId, - previousDeviceId, + deviceId, + sessionId, geo, }; } @@ -192,30 +261,19 @@ async function handleTrack( payload: ITrackPayload, context: TrackContext, ): Promise { - const { - projectId, - currentDeviceId, - previousDeviceId, - geo, - headers, - timestamp, - } = context; - - if (!currentDeviceId || !previousDeviceId) { - throw new HttpError('Device ID generation failed', { status: 500 }); - } + const { projectId, deviceId, geo, headers, timestamp, sessionId } = context; const uaInfo = parseUserAgent(headers['user-agent'], payload.properties); const groupId = uaInfo.isServer ? payload.profileId ? `${projectId}:${payload.profileId}` : `${projectId}:${generateId()}` - : currentDeviceId; + : deviceId; const jobId = [ slug(payload.name), timestamp.value, projectId, - currentDeviceId, + deviceId, groupId, ] .filter(Boolean) @@ -242,8 +300,10 @@ async function handleTrack( }, uaInfo, geo, - currentDeviceId, - previousDeviceId, + deviceId, + sessionId, + currentDeviceId: '', // TODO: Remove + previousDeviceId: '', // TODO: Remove }, groupId, jobId, @@ -330,6 +390,33 @@ async function handleDecrement( await adjustProfileProperty(payload, context.projectId, -1); } +async function handleReplay( + payload: IReplayPayload, + context: TrackContext, +): Promise { + if (!context.sessionId) { + throw new HttpError('Session ID is required for replay', { status: 400 }); + } + + const row = { + project_id: context.projectId, + session_id: context.sessionId, + profile_id: '', // TODO: remove + chunk_index: payload.chunk_index, + started_at: payload.started_at, + ended_at: payload.ended_at, + events_count: payload.events_count, + is_full_snapshot: payload.is_full_snapshot, + payload: payload.payload, + }; + await ch.insert({ + table: TABLE_NAMES.session_replay_chunks, + values: [row], + format: 'JSONEachRow', + }); + await sessionBuffer.markHasReplay(row.session_id); +} + export async function handler( request: FastifyRequest<{ Body: ITrackHandlerPayload; @@ -375,6 +462,9 @@ export async function handler( case 'decrement': await handleDecrement(validatedBody.payload, context); break; + case 'replay': + await handleReplay(validatedBody.payload, context); + break; default: return reply.status(400).send({ status: 400, @@ -383,7 +473,10 @@ export async function handler( }); } - reply.status(200).send(); + reply.status(200).send({ + deviceId: context.deviceId, + sessionId: context.sessionId, + }); } export async function fetchDeviceId( @@ -421,20 +514,31 @@ export async function fetchDeviceId( try { const multi = getRedisCache().multi(); - multi.exists(`bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`); - multi.exists(`bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`); + multi.hget( + `bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`, + 'data', + ); + multi.hget( + `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`, + 'data', + ); const res = await multi.exec(); - if (res?.[0]?.[1]) { + const data = JSON.parse(res?.[0]?.[1] as string); + const sessionId = data.payload.sessionId; return reply.status(200).send({ - deviceId: currentDeviceId, + deviceId: sessionId, + sessionId, message: 'current session exists for this device id', }); } if (res?.[1]?.[1]) { + const data = JSON.parse(res?.[1]?.[1] as string); + const sessionId = data.payload.sessionId; return reply.status(200).send({ - deviceId: previousDeviceId, + deviceId: sessionId, + sessionId, message: 'previous session exists for this device id', }); } @@ -444,6 +548,7 @@ export async function fetchDeviceId( return reply.status(200).send({ deviceId: currentDeviceId, + sessionId: '', message: 'No session exists for this device id', }); } diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index d3fb92c6..2f94a8eb 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -26,6 +26,7 @@ const trackRouter: FastifyPluginCallback = async (fastify) => { type: 'object', properties: { deviceId: { type: 'string' }, + sessionId: { type: 'string' }, message: { type: 'string', optional: true }, }, }, diff --git a/apps/start/package.json b/apps/start/package.json index f28035da..3cd4ac46 100644 --- a/apps/start/package.json +++ b/apps/start/package.json @@ -116,6 +116,7 @@ "pushmodal": "^1.0.3", "ramda": "^0.29.1", "random-animal-name": "^0.1.1", + "rrweb-player": "2.0.0-alpha.20", "rc-virtual-list": "^3.14.5", "react": "catalog:", "react-animate-height": "^3.2.3", diff --git a/apps/start/src/components/sessions/replay/browser-chrome.tsx b/apps/start/src/components/sessions/replay/browser-chrome.tsx new file mode 100644 index 00000000..dfb7d38c --- /dev/null +++ b/apps/start/src/components/sessions/replay/browser-chrome.tsx @@ -0,0 +1,42 @@ +import { cn } from '@/utils/cn'; +import type { ReactNode } from 'react'; + +export function BrowserChrome({ + url, + children, + right, + controls = ( +
+
+
+
+
+ ), + className, +}: { + url?: ReactNode; + children: ReactNode; + right?: ReactNode; + controls?: ReactNode; + className?: string; +}) { + return ( +
+
+ {controls} + {url !== false && ( +
+ {url} +
+ )} + {right} +
+ {children} +
+ ); +} diff --git a/apps/start/src/components/sessions/replay/index.tsx b/apps/start/src/components/sessions/replay/index.tsx new file mode 100644 index 00000000..553bc8e0 --- /dev/null +++ b/apps/start/src/components/sessions/replay/index.tsx @@ -0,0 +1,139 @@ +'use client'; + +import { + ReplayProvider, + useReplayContext, +} from '@/components/sessions/replay/replay-context'; +import { ReplayEventFeed } from '@/components/sessions/replay/replay-event-feed'; +import { ReplayPlayer } from '@/components/sessions/replay/replay-player'; +import { ReplayTimeline } from '@/components/sessions/replay/replay-timeline'; +import { useTRPC } from '@/integrations/trpc/react'; +import { useQuery } from '@tanstack/react-query'; +import type { IServiceEvent } from '@openpanel/db'; +import { type ReactNode, useMemo } from 'react'; +import { BrowserChrome } from './browser-chrome'; +import { ReplayTime } from './replay-controls'; + +function getEventOffsetMs(event: IServiceEvent, startTime: number): number { + const t = + typeof event.createdAt === 'object' && event.createdAt instanceof Date + ? event.createdAt.getTime() + : new Date(event.createdAt).getTime(); + return t - startTime; +} + +function BrowserUrlBar({ events }: { events: IServiceEvent[] }) { + const { currentTime, startTime } = useReplayContext(); + + const currentUrl = useMemo(() => { + if (startTime == null || !events.length) return ''; + + const withOffset = events + .map((ev) => ({ + event: ev, + offsetMs: getEventOffsetMs(ev, startTime), + })) + .filter(({ offsetMs }) => offsetMs >= -10_000 && offsetMs <= currentTime) + .sort((a, b) => a.offsetMs - b.offsetMs); + + const latest = withOffset[withOffset.length - 1]; + if (!latest) return ''; + + const { origin = '', path = '/' } = latest.event; + const pathPart = path.startsWith('/') ? path : `/${path}`; + return `${origin}${pathPart}`; + }, [events, currentTime, startTime]); + + return {currentUrl}; +} + +function ReplayContent({ + sessionId, + projectId, +}: { + sessionId: string; + projectId: string; +}) { + const trpc = useTRPC(); + const { + data: replayData, + isLoading: replayLoading, + isError: replayError, + } = useQuery(trpc.session.replay.queryOptions({ sessionId, projectId })); + const { data: eventsData } = useQuery( + trpc.event.events.queryOptions({ + projectId, + sessionId, + filters: [], + columnVisibility: {}, + }), + ); + + const events = eventsData?.data ?? []; + const replayEvents = replayData?.events as + | Array<{ type: number; data: unknown; timestamp: number }> + | undefined; + + if (replayLoading) { + return ( +
+
+ +
+
+
+ +
+
+
+ ); + } + + if (replayError || !replayEvents?.length) { + return ( +
+
+ about:blank} + > +
+ No replay data available for this session. +
+
+
+
+
+ ); + } + + return ( + +
+
+ } + right={} + > + + + +
+
+
+ +
+
+
+
+ ); +} + +export function ReplayShell({ + sessionId, + projectId, +}: { + sessionId: string; + projectId: string; +}) { + return ; +} diff --git a/apps/start/src/components/sessions/replay/replay-context.tsx b/apps/start/src/components/sessions/replay/replay-context.tsx new file mode 100644 index 00000000..c9ee94a6 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-context.tsx @@ -0,0 +1,210 @@ +'use client'; + +import { + type ReactNode, + createContext, + useCallback, + useContext, + useEffect, + useRef, + useState, +} from 'react'; + +export interface ReplayPlayerInstance { + play: () => void; + pause: () => void; + toggle: () => void; + goto: (timeOffset: number, play?: boolean) => void; + setSpeed: (speed: number) => void; + getMetaData: () => { startTime: number; endTime: number; totalTime: number }; + getReplayer: () => { getCurrentTime: () => number }; + $destroy?: () => void; +} + +interface ReplayContextValue { + currentTime: number; + isPlaying: boolean; + speed: number; + duration: number; + startTime: number | null; + isReady: boolean; + play: () => void; + pause: () => void; + toggle: () => void; + seek: (timeOffset: number, play?: boolean) => void; + setSpeed: (speed: number) => void; + registerPlayer: (player: ReplayPlayerInstance) => void; + unregisterPlayer: () => void; +} + +const ReplayContext = createContext(null); + +const SPEED_OPTIONS = [0.5, 1, 2, 4, 8] as const; + +export function useReplayContext() { + const ctx = useContext(ReplayContext); + if (!ctx) { + throw new Error('useReplayContext must be used within ReplayProvider'); + } + return ctx; +} + +export function ReplayProvider({ children }: { children: ReactNode }) { + const playerRef = useRef(null); + const [currentTime, setCurrentTime] = useState(0); + const [isPlaying, setIsPlaying] = useState(false); + const [speed, setSpeedState] = useState(1); + const [duration, setDuration] = useState(0); + const [startTime, setStartTime] = useState(null); + const [isReady, setIsReady] = useState(false); + const rafIdRef = useRef(null); + const lastUpdateRef = useRef(0); + // Refs so stable callbacks can read latest values + const isPlayingRef = useRef(false); + const durationRef = useRef(0); + const currentTimeRef = useRef(0); + + const registerPlayer = useCallback((player: ReplayPlayerInstance) => { + playerRef.current = player; + try { + const meta = player.getMetaData(); + durationRef.current = meta.totalTime; + setDuration(meta.totalTime); + setStartTime(meta.startTime); + setCurrentTime(0); + currentTimeRef.current = 0; + setIsReady(true); + } catch { + setIsReady(false); + } + }, []); + + const unregisterPlayer = useCallback(() => { + if (rafIdRef.current != null) { + cancelAnimationFrame(rafIdRef.current); + rafIdRef.current = null; + } + playerRef.current = null; + setIsReady(false); + setCurrentTime(0); + currentTimeRef.current = 0; + setDuration(0); + durationRef.current = 0; + setStartTime(null); + setIsPlaying(false); + isPlayingRef.current = false; + }, []); + + const play = useCallback(() => { + playerRef.current?.play(); + setIsPlaying(true); + isPlayingRef.current = true; + }, []); + + const pause = useCallback(() => { + playerRef.current?.pause(); + setIsPlaying(false); + isPlayingRef.current = false; + }, []); + + const toggle = useCallback(() => { + const player = playerRef.current; + if (!player) return; + + // If at the end, reset to start and play + const atEnd = currentTimeRef.current >= durationRef.current - 100; + if (atEnd && !isPlayingRef.current) { + player.goto(0, true); + setCurrentTime(0); + currentTimeRef.current = 0; + setIsPlaying(true); + isPlayingRef.current = true; + return; + } + + player.toggle(); + const next = !isPlayingRef.current; + setIsPlaying(next); + isPlayingRef.current = next; + }, []); + + const seek = useCallback((timeOffset: number, play?: boolean) => { + const player = playerRef.current; + if (!player) return; + const shouldPlay = play ?? isPlayingRef.current; + player.goto(timeOffset, shouldPlay); + setCurrentTime(timeOffset); + currentTimeRef.current = timeOffset; + setIsPlaying(shouldPlay); + isPlayingRef.current = shouldPlay; + }, []); + + const setSpeed = useCallback((s: number) => { + if (!SPEED_OPTIONS.includes(s as (typeof SPEED_OPTIONS)[number])) return; + playerRef.current?.setSpeed(s); + setSpeedState(s); + }, []); + + useEffect(() => { + if (!isReady || !playerRef.current) return; + + const tick = () => { + const player = playerRef.current; + if (!player) return; + try { + const replayer = player.getReplayer(); + const now = replayer.getCurrentTime(); + // Throttle state updates to ~10fps (every 100ms) to avoid excessive re-renders + const t = Math.floor(now / 100); + if (t !== lastUpdateRef.current) { + lastUpdateRef.current = t; + setCurrentTime(now); + currentTimeRef.current = now; + } + + // Detect end of replay + if ( + now >= durationRef.current - 50 && + durationRef.current > 0 && + isPlayingRef.current + ) { + setIsPlaying(false); + isPlayingRef.current = false; + } + } catch { + // Player may be destroyed + } + rafIdRef.current = requestAnimationFrame(tick); + }; + + rafIdRef.current = requestAnimationFrame(tick); + return () => { + if (rafIdRef.current != null) { + cancelAnimationFrame(rafIdRef.current); + rafIdRef.current = null; + } + }; + }, [isReady]); + + const value: ReplayContextValue = { + currentTime, + isPlaying, + speed, + duration, + startTime, + isReady, + play, + pause, + toggle, + seek, + setSpeed, + registerPlayer, + unregisterPlayer, + }; + + return ( + {children} + ); +} + +export { SPEED_OPTIONS }; diff --git a/apps/start/src/components/sessions/replay/replay-controls.tsx b/apps/start/src/components/sessions/replay/replay-controls.tsx new file mode 100644 index 00000000..c0230a46 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-controls.tsx @@ -0,0 +1,69 @@ +'use client'; + +import { + SPEED_OPTIONS, + useReplayContext, +} from '@/components/sessions/replay/replay-context'; +import { Button } from '@/components/ui/button'; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from '@/components/ui/dropdown-menu'; +import { ChevronDown, Pause, Play, SkipBack, SkipForward } from 'lucide-react'; + +function formatTime(ms: number): string { + const totalSeconds = Math.max(0, Math.floor(ms / 1000)); + const m = Math.floor(totalSeconds / 60); + const s = totalSeconds % 60; + return `${m}:${s.toString().padStart(2, '0')}`; +} + +export function ReplayTime() { + const { currentTime, duration } = useReplayContext(); + + return ( + + {formatTime(currentTime)} / {formatTime(duration)} + + ); +} + +export function ReplayPlayPauseButton() { + const { isPlaying, isReady, toggle, seek } = useReplayContext(); + + if (!isReady) return null; + + return ( + + ); +} + +// {/* +// +// +// +// +// {SPEED_OPTIONS.map((s) => ( +// setSpeed(s)} +// className={speed === s ? 'bg-accent' : ''} +// > +// {s}x +// +// ))} +// +// */} diff --git a/apps/start/src/components/sessions/replay/replay-event-feed.tsx b/apps/start/src/components/sessions/replay/replay-event-feed.tsx new file mode 100644 index 00000000..90abd186 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-event-feed.tsx @@ -0,0 +1,95 @@ +'use client'; + +import { useReplayContext } from '@/components/sessions/replay/replay-context'; +import { ReplayEventItem } from '@/components/sessions/replay/replay-event-item'; +import { ScrollArea } from '@/components/ui/scroll-area'; +import type { IServiceEvent } from '@openpanel/db'; +import { useEffect, useMemo, useRef } from 'react'; +import { BrowserChrome } from './browser-chrome'; + +function getEventOffsetMs(event: IServiceEvent, startTime: number): number { + const t = + typeof event.createdAt === 'object' && event.createdAt instanceof Date + ? event.createdAt.getTime() + : new Date(event.createdAt).getTime(); + return t - startTime; +} + +export function ReplayEventFeed({ events }: { events: IServiceEvent[] }) { + const { currentTime, startTime, isReady, seek } = useReplayContext(); + const viewportRef = useRef(null); + const prevCountRef = useRef(0); + + const { visibleEvents, currentEventId } = useMemo(() => { + if (startTime == null || !isReady) { + return { visibleEvents: [], currentEventId: null as string | null }; + } + const withOffset = events + .map((ev) => ({ + event: ev, + offsetMs: getEventOffsetMs(ev, startTime), + })) + // Include events up to 10s before recording started (e.g. screen views) + .filter(({ offsetMs }) => offsetMs >= -10_000 && offsetMs <= currentTime) + .sort((a, b) => a.offsetMs - b.offsetMs); + + const visibleEvents = withOffset.map(({ event, offsetMs }) => ({ + event, + offsetMs, + })); + + const current = + visibleEvents.length > 0 ? visibleEvents[visibleEvents.length - 1] : null; + const currentEventId = current?.event.id ?? null; + + return { visibleEvents, currentEventId }; + }, [events, startTime, isReady, currentTime]); + + useEffect(() => { + const viewport = viewportRef.current; + if (!viewport || visibleEvents.length === 0) return; + + const isNewItem = visibleEvents.length > prevCountRef.current; + prevCountRef.current = visibleEvents.length; + + requestAnimationFrame(() => { + viewport.scrollTo({ + top: viewport.scrollHeight, + behavior: isNewItem ? 'smooth' : 'instant', + }); + }); + }, [visibleEvents.length]); + + if (!isReady) return null; + + return ( + Timeline} + className="h-full" + > + +
+ {visibleEvents.map(({ event, offsetMs }) => ( +
+ seek(Math.max(0, offsetMs))} + /> +
+ ))} + {visibleEvents.length === 0 && ( +
+ Events will appear as the replay plays. +
+ )} +
+
+
+ ); +} diff --git a/apps/start/src/components/sessions/replay/replay-event-item.tsx b/apps/start/src/components/sessions/replay/replay-event-item.tsx new file mode 100644 index 00000000..b030e581 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-event-item.tsx @@ -0,0 +1,56 @@ +'use client'; + +import { EventIcon } from '@/components/events/event-icon'; +import { cn } from '@/lib/utils'; +import type { IServiceEvent } from '@openpanel/db'; + +function formatOffset(ms: number): string { + const sign = ms < 0 ? '-' : '+'; + const abs = Math.abs(ms); + const totalSeconds = Math.floor(abs / 1000); + const m = Math.floor(totalSeconds / 60); + const s = totalSeconds % 60; + return `${sign}${m}:${s.toString().padStart(2, '0')}`; +} + +export function ReplayEventItem({ + event, + offsetMs, + isCurrent, + onClick, +}: { + event: IServiceEvent; + offsetMs: number; + isCurrent: boolean; + onClick: () => void; +}) { + const displayName = + event.name === 'screen_view' && event.path + ? event.path + : event.name.replace(/_/g, ' '); + + return ( + + ); +} diff --git a/apps/start/src/components/sessions/replay/replay-player.tsx b/apps/start/src/components/sessions/replay/replay-player.tsx new file mode 100644 index 00000000..fe2f3075 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-player.tsx @@ -0,0 +1,99 @@ +'use client'; + +import { useReplayContext } from '@/components/sessions/replay/replay-context'; +import type { ReplayPlayerInstance } from '@/components/sessions/replay/replay-context'; +import { useEffect, useMemo, useRef } from 'react'; + +import 'rrweb-player/dist/style.css'; + +/** rrweb meta event (type 4) carries the recorded viewport size */ +function getRecordedDimensions( + events: Array<{ type: number; data: unknown }>, +): { width: number; height: number } | null { + const meta = events.find((e) => e.type === 4); + if ( + meta && + typeof meta.data === 'object' && + meta.data !== null && + 'width' in meta.data && + 'height' in meta.data + ) { + const { width, height } = meta.data as { width: number; height: number }; + if (width > 0 && height > 0) return { width, height }; + } + return null; +} + +export function ReplayPlayer({ + events, +}: { + events: Array<{ type: number; data: unknown; timestamp: number }>; +}) { + const containerRef = useRef(null); + const playerRef = useRef(null); + const { registerPlayer, unregisterPlayer } = useReplayContext(); + + const recordedDimensions = useMemo( + () => getRecordedDimensions(events), + [events], + ); + + useEffect(() => { + if (!events.length || !containerRef.current) return; + + let mounted = true; + + import('rrweb-player').then((module) => { + const PlayerConstructor = module.default; + if (!containerRef.current || !mounted) return; + containerRef.current.innerHTML = ''; + + const maxHeight = window.innerHeight * 0.7; + const containerWidth = containerRef.current.offsetWidth; + const aspectRatio = recordedDimensions + ? recordedDimensions.width / recordedDimensions.height + : 16 / 9; + const height = Math.min( + Math.round(containerWidth / aspectRatio), + maxHeight, + ); + const width = Math.min( + containerWidth, + Math.round(height * aspectRatio), + ); + + const player = new PlayerConstructor({ + target: containerRef.current, + props: { + events, + width, + height, + autoPlay: false, + showController: false, + speedOption: [0.5, 1, 2, 4, 8], + }, + }) as ReplayPlayerInstance; + playerRef.current = player; + registerPlayer(player); + }); + + return () => { + mounted = false; + unregisterPlayer(); + if (playerRef.current?.$destroy) { + playerRef.current.$destroy(); + playerRef.current = null; + } + }; + }, [events, registerPlayer, unregisterPlayer, recordedDimensions]); + + return ( +
+
+
+ ); +} diff --git a/apps/start/src/components/sessions/replay/replay-timeline.tsx b/apps/start/src/components/sessions/replay/replay-timeline.tsx new file mode 100644 index 00000000..d9982a56 --- /dev/null +++ b/apps/start/src/components/sessions/replay/replay-timeline.tsx @@ -0,0 +1,234 @@ +'use client'; + +import { useReplayContext } from '@/components/sessions/replay/replay-context'; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from '@/components/ui/tooltip'; +import type { IServiceEvent } from '@openpanel/db'; +import { AnimatePresence, motion } from 'framer-motion'; +import { useCallback, useRef, useState } from 'react'; + +import { EventIcon } from '@/components/events/event-icon'; +import { cn } from '@/lib/utils'; +import { ReplayPlayPauseButton } from './replay-controls'; + +function formatTime(ms: number): string { + const totalSeconds = Math.max(0, Math.floor(ms / 1000)); + const m = Math.floor(totalSeconds / 60); + const s = totalSeconds % 60; + return `${m}:${s.toString().padStart(2, '0')}`; +} + +function getEventOffsetMs(event: IServiceEvent, startTime: number): number { + const t = + typeof event.createdAt === 'object' && event.createdAt instanceof Date + ? event.createdAt.getTime() + : new Date(event.createdAt).getTime(); + return t - startTime; +} + +export function ReplayTimeline({ events }: { events: IServiceEvent[] }) { + const { currentTime, duration, startTime, isReady, seek } = + useReplayContext(); + const trackRef = useRef(null); + const [isDragging, setIsDragging] = useState(false); + const [hoverInfo, setHoverInfo] = useState<{ + pct: number; + timeMs: number; + } | null>(null); + + const getTimeFromClientX = useCallback( + (clientX: number) => { + if (!trackRef.current || duration <= 0) return null; + const rect = trackRef.current.getBoundingClientRect(); + const x = clientX - rect.left; + const pct = Math.max(0, Math.min(1, x / rect.width)); + return { pct, timeMs: pct * duration }; + }, + [duration], + ); + + const handleTrackMouseMove = useCallback( + (e: React.MouseEvent) => { + if ((e.target as HTMLElement).closest('[data-timeline-event]')) { + setHoverInfo(null); + return; + } + const info = getTimeFromClientX(e.clientX); + if (info) setHoverInfo(info); + }, + [getTimeFromClientX], + ); + + const handleTrackMouseLeave = useCallback(() => { + if (!isDragging) setHoverInfo(null); + }, [isDragging]); + + const seekToPosition = useCallback( + (clientX: number) => { + const info = getTimeFromClientX(clientX); + if (info) seek(info.timeMs); + }, + [getTimeFromClientX, seek], + ); + + const handleTrackMouseDown = useCallback( + (e: React.MouseEvent) => { + // Only handle direct clicks on the track, not on child elements like the thumb + if ( + e.target !== trackRef.current && + !(e.target as HTMLElement).closest('.replay-track-bg') + ) + return; + seekToPosition(e.clientX); + }, + [seekToPosition], + ); + + const handleThumbMouseDown = useCallback( + (e: React.MouseEvent) => { + e.preventDefault(); + e.stopPropagation(); + setIsDragging(true); + const onMouseMove = (moveEvent: MouseEvent) => { + seekToPosition(moveEvent.clientX); + const info = getTimeFromClientX(moveEvent.clientX); + if (info) setHoverInfo(info); + }; + const onMouseUp = () => { + setIsDragging(false); + setHoverInfo(null); + document.removeEventListener('mousemove', onMouseMove); + document.removeEventListener('mouseup', onMouseUp); + }; + document.addEventListener('mousemove', onMouseMove); + document.addEventListener('mouseup', onMouseUp); + }, + [seekToPosition, getTimeFromClientX], + ); + + if (!isReady || duration <= 0) return null; + + const progressPct = + duration > 0 + ? Math.max(0, Math.min(100, (currentTime / duration) * 100)) + : 0; + + const eventsWithOffset = events + .map((ev) => ({ + event: ev, + offsetMs: startTime != null ? getEventOffsetMs(ev, startTime) : 0, + })) + .filter(({ offsetMs }) => offsetMs >= 0 && offsetMs <= duration); + + return ( + +
+ +
+
{ + const step = 5000; + if (e.key === 'ArrowLeft') { + e.preventDefault(); + seek(Math.max(0, currentTime - step)); + } else if (e.key === 'ArrowRight') { + e.preventDefault(); + seek(Math.min(duration, currentTime + step)); + } + }} + > +
+
+
+
+ {/* Hover timestamp tooltip */} + + {hoverInfo && ( + + {/* Vertical line */} +
+ {/* Timestamp badge */} + + {formatTime(hoverInfo.timeMs)} + + + )} + + {eventsWithOffset.map(({ event: ev, offsetMs }) => { + const pct = (offsetMs / duration) * 100; + return ( + + + + + +
+ + {ev.name === 'screen_view' ? ev.path : ev.name} +
+
+ {formatTime(offsetMs)} +
+
+
+ ); + })} +
+
+
+ + ); +} diff --git a/apps/start/src/components/sessions/table/columns.tsx b/apps/start/src/components/sessions/table/columns.tsx index e5a3e0be..7f28eadf 100644 --- a/apps/start/src/components/sessions/table/columns.tsx +++ b/apps/start/src/components/sessions/table/columns.tsx @@ -2,6 +2,7 @@ import { ProjectLink } from '@/components/links'; import { SerieIcon } from '@/components/report-chart/common/serie-icon'; import { formatDateTime, formatTimeAgoOrDateTime } from '@/utils/date'; import type { ColumnDef } from '@tanstack/react-table'; +import { Video } from 'lucide-react'; import { ColumnCreatedAt } from '@/components/column-created-at'; import { ProfileAvatar } from '@/components/profiles/profile-avatar'; @@ -44,13 +45,25 @@ export function useColumns() { cell: ({ row }) => { const session = row.original; return ( - - {session.id.slice(0, 8)}... - +
+ + {session.id.slice(0, 8)}... + + {session.hasReplay && ( + + + )} +
); }, }, diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.sessions_.$sessionId.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.sessions_.$sessionId.tsx index f031dc89..6714c5ed 100644 --- a/apps/start/src/routes/_app.$organizationId.$projectId.sessions_.$sessionId.tsx +++ b/apps/start/src/routes/_app.$organizationId.$projectId.sessions_.$sessionId.tsx @@ -3,6 +3,7 @@ import FullPageLoadingState from '@/components/full-page-loading-state'; import { PageContainer } from '@/components/page-container'; import { PageHeader } from '@/components/page-header'; import { SerieIcon } from '@/components/report-chart/common/serie-icon'; +import { ReplayShell } from '@/components/sessions/replay'; import { useReadColumnVisibility } from '@/components/ui/data-table/data-table-hooks'; import { useEventQueryFilters, @@ -117,6 +118,9 @@ function Component() { )}
+
+ +
); diff --git a/apps/start/src/types/rrweb-player.d.ts b/apps/start/src/types/rrweb-player.d.ts new file mode 100644 index 00000000..bdffb9a4 --- /dev/null +++ b/apps/start/src/types/rrweb-player.d.ts @@ -0,0 +1,43 @@ +declare module 'rrweb-player' { + interface RrwebPlayerProps { + events: Array<{ type: number; data: unknown; timestamp: number }>; + width?: number; + height?: number; + autoPlay?: boolean; + showController?: boolean; + speedOption?: number[]; + } + + interface RrwebPlayerOptions { + target: HTMLElement; + props: RrwebPlayerProps; + } + + interface RrwebReplayer { + getCurrentTime: () => number; + } + + interface RrwebPlayerMetaData { + startTime: number; + endTime: number; + totalTime: number; + } + + interface RrwebPlayerInstance { + play: () => void; + pause: () => void; + toggle: () => void; + goto: (timeOffset: number, play?: boolean) => void; + setSpeed: (speed: number) => void; + getMetaData: () => RrwebPlayerMetaData; + getReplayer: () => RrwebReplayer; + addEventListener?: ( + event: string, + handler: (...args: unknown[]) => void, + ) => void; + $destroy?: () => void; + } + + const rrwebPlayer: new (options: RrwebPlayerOptions) => RrwebPlayerInstance; + export default rrwebPlayer; +} diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 73f6c000..0c7b89cc 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -15,7 +15,6 @@ import { import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import * as R from 'ramda'; -import { v4 as uuid } from 'uuid'; import { logger as baseLogger } from '@/utils/logger'; import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; @@ -56,7 +55,6 @@ async function createEventAndNotify( checkNotificationRulesForEvent(payload).catch(() => {}), ]); - console.log('Event created:', event); return event; } @@ -87,6 +85,8 @@ export async function incomingEvent( projectId, currentDeviceId, previousDeviceId, + deviceId, + sessionId, uaInfo: _uaInfo, } = jobPayload; const properties = body.properties ?? {}; @@ -157,7 +157,6 @@ export async function incomingEvent( : undefined, } as const; - console.log('HERE?'); // if timestamp is from the past we dont want to create a new session if (uaInfo.isServer || isTimestampFromThePast) { const session = profileId @@ -167,8 +166,6 @@ export async function incomingEvent( }) : null; - console.log('Server?'); - const payload = { ...baseEvent, deviceId: session?.device_id ?? '', @@ -194,31 +191,31 @@ export async function incomingEvent( return createEventAndNotify(payload as IServiceEvent, logger, projectId); } - console.log('not?'); + const sessionEnd = await getSessionEnd({ projectId, currentDeviceId, previousDeviceId, + deviceId, profileId, }); - console.log('Server?'); - const lastScreenView = sessionEnd + const activeSession = sessionEnd ? await sessionBuffer.getExistingSession({ sessionId: sessionEnd.sessionId, }) : null; const payload: IServiceCreateEventPayload = merge(baseEvent, { - deviceId: sessionEnd?.deviceId ?? currentDeviceId, - sessionId: sessionEnd?.sessionId ?? uuid(), + deviceId: sessionEnd?.deviceId ?? deviceId, + sessionId: sessionEnd?.sessionId ?? sessionId, referrer: sessionEnd?.referrer ?? baseEvent.referrer, referrerName: sessionEnd?.referrerName ?? baseEvent.referrerName, referrerType: sessionEnd?.referrerType ?? baseEvent.referrerType, // if the path is not set, use the last screen view path - path: baseEvent.path || lastScreenView?.exit_path || '', - origin: baseEvent.origin || lastScreenView?.exit_origin || '', + path: baseEvent.path || activeSession?.exit_path || '', + origin: baseEvent.origin || activeSession?.exit_origin || '', } as Partial) as IServiceCreateEventPayload; - console.log('SessionEnd?', sessionEnd); + if (!sessionEnd) { logger.info('Creating session start event', { event: payload }); await createEventAndNotify( diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts index 59b6edad..dcf427fe 100644 --- a/apps/worker/src/utils/session-handler.ts +++ b/apps/worker/src/utils/session-handler.ts @@ -39,17 +39,20 @@ export async function getSessionEnd({ projectId, currentDeviceId, previousDeviceId, + deviceId, profileId, }: { projectId: string; currentDeviceId: string; previousDeviceId: string; + deviceId: string; profileId: string; }) { const sessionEnd = await getSessionEndJob({ projectId, currentDeviceId, previousDeviceId, + deviceId, }); if (sessionEnd) { @@ -81,6 +84,7 @@ export async function getSessionEndJob(args: { projectId: string; currentDeviceId: string; previousDeviceId: string; + deviceId: string; retryCount?: number; }): Promise<{ deviceId: string; @@ -130,20 +134,31 @@ export async function getSessionEndJob(args: { return null; } - // Check current device job - const currentJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.currentDeviceId), - ); - if (currentJob) { - return await handleJobStates(currentJob, args.currentDeviceId); + // TODO: Remove this when migrated to deviceId + if (args.currentDeviceId && args.previousDeviceId) { + // Check current device job + const currentJob = await sessionsQueue.getJob( + getSessionEndJobId(args.projectId, args.currentDeviceId), + ); + if (currentJob) { + return await handleJobStates(currentJob, args.currentDeviceId); + } + + // Check previous device job + const previousJob = await sessionsQueue.getJob( + getSessionEndJobId(args.projectId, args.previousDeviceId), + ); + if (previousJob) { + return await handleJobStates(previousJob, args.previousDeviceId); + } } - // Check previous device job - const previousJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.previousDeviceId), + // Check current device job + const currentJob = await sessionsQueue.getJob( + getSessionEndJobId(args.projectId, args.deviceId), ); - if (previousJob) { - return await handleJobStates(previousJob, args.previousDeviceId); + if (currentJob) { + return await handleJobStates(currentJob, args.deviceId); } // Create session diff --git a/packages/common/src/id.ts b/packages/common/src/id.ts index 7b9462be..06d963d9 100644 --- a/packages/common/src/id.ts +++ b/packages/common/src/id.ts @@ -4,6 +4,6 @@ export function shortId() { return nanoid(4); } -export function generateId() { - return nanoid(8); +export function generateId(prefix?: string, length?: number) { + return prefix ? `${prefix}_${nanoid(length ?? 8)}` : nanoid(length ?? 8); } diff --git a/packages/db/code-migrations/10-add-session-replay.ts b/packages/db/code-migrations/10-add-session-replay.ts new file mode 100644 index 00000000..03c75710 --- /dev/null +++ b/packages/db/code-migrations/10-add-session-replay.ts @@ -0,0 +1,65 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { TABLE_NAMES } from '../src/clickhouse/client'; +import { + addColumns, + createTable, + modifyTTL, + runClickhouseMigrationCommands, +} from '../src/clickhouse/migration'; +import { getIsCluster } from './helpers'; + +export async function up() { + const isClustered = getIsCluster(); + + const sqls: string[] = [ + ...createTable({ + name: TABLE_NAMES.session_replay_chunks, + columns: [ + '`project_id` String CODEC(ZSTD(3))', + '`session_id` String CODEC(ZSTD(3))', + '`chunk_index` UInt16', + '`started_at` DateTime64(3) CODEC(DoubleDelta, ZSTD(3))', + '`ended_at` DateTime64(3) CODEC(DoubleDelta, ZSTD(3))', + '`events_count` UInt16', + '`is_full_snapshot` Bool', + '`payload` String CODEC(ZSTD(6))', + ], + orderBy: ['project_id', 'session_id', 'chunk_index'], + partitionBy: 'toYYYYMM(started_at)', + settings: { + index_granularity: 8192, + }, + distributionHash: 'cityHash64(project_id, session_id)', + replicatedVersion: '1', + isClustered, + }), + ...addColumns( + TABLE_NAMES.sessions, + ['`has_replay` Bool DEFAULT 0'], + isClustered, + ), + modifyTTL({ + tableName: TABLE_NAMES.session_replay_chunks, + isClustered, + ttl: 'started_at + INTERVAL 30 DAY', + }), + ]; + + fs.writeFileSync( + path.join(__filename.replace('.ts', '.sql')), + sqls + .map((sql) => + sql + .trim() + .replace(/;$/, '') + .replace(/\n{2,}/g, '\n') + .concat(';'), + ) + .join('\n\n---\n\n'), + ); + + if (!process.argv.includes('--dry')) { + await runClickhouseMigrationCommands(sqls); + } +} diff --git a/packages/db/code-migrations/migrate.ts b/packages/db/code-migrations/migrate.ts index 3f4a5eac..1598f060 100644 --- a/packages/db/code-migrations/migrate.ts +++ b/packages/db/code-migrations/migrate.ts @@ -19,12 +19,19 @@ async function migrate() { const migration = args.filter((arg) => !arg.startsWith('--'))[0]; const migrationsDir = path.join(__dirname, '..', 'code-migrations'); - const migrations = fs.readdirSync(migrationsDir).filter((file) => { - const version = file.split('-')[0]; - return ( - !Number.isNaN(Number.parseInt(version ?? '')) && file.endsWith('.ts') - ); - }); + const migrations = fs + .readdirSync(migrationsDir) + .filter((file) => { + const version = file.split('-')[0]; + return ( + !Number.isNaN(Number.parseInt(version ?? '')) && file.endsWith('.ts') + ); + }) + .sort((a, b) => { + const aVersion = Number.parseInt(a.split('-')[0]!); + const bVersion = Number.parseInt(b.split('-')[0]!); + return aVersion - bVersion; + }); const finishedMigrations = await db.codeMigration.findMany(); diff --git a/packages/db/src/buffers/session-buffer.ts b/packages/db/src/buffers/session-buffer.ts index fad5ee09..6b4027c8 100644 --- a/packages/db/src/buffers/session-buffer.ts +++ b/packages/db/src/buffers/session-buffer.ts @@ -163,10 +163,46 @@ export class SessionBuffer extends BaseBuffer { : '', sign: 1, version: 1, + has_replay: false, }, ]; } + async markHasReplay(sessionId: string): Promise { + console.log('markHasReplay', sessionId); + const existingSession = await this.getExistingSession({ sessionId }); + if (!existingSession) { + console.log('no existing session or has replay', existingSession); + return; + } + + if (existingSession.has_replay) { + return; + } + + const oldSession = assocPath(['sign'], -1, clone(existingSession)); + const newSession = assocPath(['sign'], 1, clone(existingSession)); + newSession.version = existingSession.version + 1; + newSession.has_replay = true; + + const multi = this.redis.multi(); + multi.set( + `session:${sessionId}`, + JSON.stringify(newSession), + 'EX', + 60 * 60, + ); + multi.rpush(this.redisKey, JSON.stringify(newSession)); + multi.rpush(this.redisKey, JSON.stringify(oldSession)); + multi.incrby(this.bufferCounterKey, 2); + await multi.exec(); + + const bufferLength = await this.getBufferSize(); + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } + } + async add(event: IClickhouseEvent) { if (!event.session_id) { return; diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts index 244d15a2..bcba30be 100644 --- a/packages/db/src/clickhouse/client.ts +++ b/packages/db/src/clickhouse/client.ts @@ -59,6 +59,7 @@ export const TABLE_NAMES = { cohort_events_mv: 'cohort_events_mv', sessions: 'sessions', events_imports: 'events_imports', + session_replay_chunks: 'session_replay_chunks', }; /** diff --git a/packages/db/src/services/session.service.ts b/packages/db/src/services/session.service.ts index 0d39460a..0e2e9ce9 100644 --- a/packages/db/src/services/session.service.ts +++ b/packages/db/src/services/session.service.ts @@ -52,6 +52,7 @@ export type IClickhouseSession = { revenue: number; sign: 1 | 0; version: number; + has_replay?: boolean; }; export interface IServiceSession { @@ -90,6 +91,7 @@ export interface IServiceSession { utmContent: string; utmTerm: string; revenue: number; + hasReplay: boolean; profile?: IServiceProfile; } @@ -141,6 +143,7 @@ export function transformSession(session: IClickhouseSession): IServiceSession { utmContent: session.utm_content, utmTerm: session.utm_term, revenue: session.revenue, + hasReplay: session.has_replay ?? false, profile: undefined, }; } @@ -229,6 +232,7 @@ export async function getSessionList({ 'screen_view_count', 'event_count', 'revenue', + 'has_replay', ]; columns.forEach((column) => { @@ -321,6 +325,41 @@ export async function getSessionsCount({ export const getSessionsCountCached = cacheable(getSessionsCount, 60 * 10); +export async function getSessionReplayEvents( + sessionId: string, + projectId: string, +): Promise<{ events: unknown[] }> { + const chunks = await clix(ch) + .select<{ chunk_index: number; payload: string }>(['chunk_index', 'payload']) + .from(TABLE_NAMES.session_replay_chunks) + .where('session_id', '=', sessionId) + .where('project_id', '=', projectId) + .orderBy('chunk_index', 'ASC') + .execute(); + + const allEvents = chunks.flatMap((chunk) => + JSON.parse(chunk.payload) as unknown[], + ); + + // rrweb event types: 2 = FullSnapshot, 4 = Meta + // Incremental snapshots (type 3) before the first FullSnapshot are orphaned + // and cause the player to fast-forward through empty time. Strip them but + // keep Meta events (type 4) since rrweb needs them for viewport dimensions. + const firstFullSnapshotIdx = allEvents.findIndex( + (e: any) => e.type === 2, + ); + + let events = allEvents; + if (firstFullSnapshotIdx > 0) { + const metaEvents = allEvents + .slice(0, firstFullSnapshotIdx) + .filter((e: any) => e.type === 4); + events = [...metaEvents, ...allEvents.slice(firstFullSnapshotIdx)]; + } + + return { events }; +} + class SessionService { constructor(private client: typeof ch) {} diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 37ee7024..0f371d90 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -65,8 +65,10 @@ export interface EventsQueuePayloadIncomingEvent { latitude: number | undefined; }; headers: Record; - currentDeviceId: string; - previousDeviceId: string; + currentDeviceId: string; // TODO: Remove + previousDeviceId: string; // TODO: Remove + deviceId: string; + sessionId: string; }; } export interface EventsQueuePayloadCreateEvent { diff --git a/packages/sdks/sdk/src/index.ts b/packages/sdks/sdk/src/index.ts index a7967e78..1c58dd17 100644 --- a/packages/sdks/sdk/src/index.ts +++ b/packages/sdks/sdk/src/index.ts @@ -37,6 +37,8 @@ export type OpenPanelOptions = { export class OpenPanel { api: Api; profileId?: string; + deviceId?: string; + sessionId?: string; global?: Record; queue: TrackHandlerPayload[] = []; @@ -69,6 +71,16 @@ export class OpenPanel { this.flush(); } + private shouldQueue(payload: TrackHandlerPayload): boolean { + if (payload.type === 'replay' && !this.sessionId) { + return true; + } + if (this.options.waitForProfile && !this.profileId) { + return true; + } + return false; + } + async send(payload: TrackHandlerPayload) { if (this.options.disabled) { return Promise.resolve(); @@ -78,11 +90,25 @@ export class OpenPanel { return Promise.resolve(); } - if (this.options.waitForProfile && !this.profileId) { + if (this.shouldQueue(payload)) { this.queue.push(payload); return Promise.resolve(); } - return this.api.fetch('/track', payload); + + const result = await this.api.fetch< + TrackHandlerPayload, + { deviceId: string; sessionId: string } + >('/track', payload); + this.deviceId = result?.deviceId; + const hadSession = !!this.sessionId; + this.sessionId = result?.sessionId; + + // Flush queued items (e.g. replay chunks) when sessionId first arrives + if (!hadSession && this.sessionId) { + this.flush(); + } + + return result; } setGlobalProperties(properties: Record) { @@ -160,33 +186,44 @@ export class OpenPanel { }); } + getDeviceId(): string { + return this.deviceId ?? ''; + } + + getSessionId(): string { + return this.sessionId ?? ''; + } + async fetchDeviceId(): Promise { - const result = await this.api.fetch( - '/track/device-id', - undefined, - { method: 'GET', keepalive: false }, - ); - return result?.deviceId ?? ''; + return Promise.resolve(this.deviceId ?? ''); } clear() { this.profileId = undefined; - // should we force a session end here? + this.deviceId = undefined; + this.sessionId = undefined; } flush() { - this.queue.forEach((item) => { - this.send({ - ...item, - // Not sure why ts-expect-error is needed here - // @ts-expect-error - payload: { - ...item.payload, - profileId: item.payload.profileId ?? this.profileId, - }, - }); - }); - this.queue = []; + const remaining: TrackHandlerPayload[] = []; + for (const item of this.queue) { + if (this.shouldQueue(item)) { + remaining.push(item); + continue; + } + const payload = + item.type === 'replay' + ? item.payload + : { + ...item.payload, + profileId: + 'profileId' in item.payload + ? (item.payload.profileId ?? this.profileId) + : this.profileId, + }; + this.send({ ...item, payload } as TrackHandlerPayload); + } + this.queue = remaining; } log(...args: any[]) { diff --git a/packages/sdks/web/package.json b/packages/sdks/web/package.json index 05ee093a..8de98203 100644 --- a/packages/sdks/web/package.json +++ b/packages/sdks/web/package.json @@ -11,6 +11,8 @@ }, "dependencies": { "@openpanel/sdk": "workspace:1.0.4-local" + "@rrweb/types": "2.0.0-alpha.20", + "rrweb": "2.0.0-alpha.20" }, "devDependencies": { "@openpanel/tsconfig": "workspace:*", diff --git a/packages/sdks/web/src/index.ts b/packages/sdks/web/src/index.ts index 1192a419..02bba421 100644 --- a/packages/sdks/web/src/index.ts +++ b/packages/sdks/web/src/index.ts @@ -7,11 +7,36 @@ import { OpenPanel as OpenPanelBase } from '@openpanel/sdk'; export type * from '@openpanel/sdk'; export { OpenPanel as OpenPanelBase } from '@openpanel/sdk'; +export type SessionReplayOptions = { + enabled: boolean; + sampleRate?: number; + maskAllInputs?: boolean; + maskTextSelector?: string; + blockSelector?: string; + blockClass?: string; + ignoreSelector?: string; + flushIntervalMs?: number; + maxEventsPerChunk?: number; + maxPayloadBytes?: number; + /** + * URL to the replay recorder script. + * Only used when loading the SDK via a script tag (IIFE / op1.js). + * When using the npm package with a bundler this option is ignored + * because the bundler resolves the replay module from the package. + */ + scriptUrl?: string; +}; + +// Injected at build time only in the IIFE (tracker) build. +// In the library build this is `undefined`. +declare const __OPENPANEL_REPLAY_URL__: string | undefined; + export type OpenPanelOptions = OpenPanelBaseOptions & { trackOutgoingLinks?: boolean; trackScreenViews?: boolean; trackAttributes?: boolean; trackHashChanges?: boolean; + sessionReplay?: SessionReplayOptions; }; function toCamelCase(str: string) { @@ -66,6 +91,76 @@ export class OpenPanel extends OpenPanelBase { if (this.options.trackAttributes) { this.trackAttributes(); } + + if (this.options.sessionReplay?.enabled) { + const sampleRate = this.options.sessionReplay.sampleRate ?? 1; + const sampled = Math.random() < sampleRate; + if (sampled) { + this.loadReplayModule().then((mod) => { + if (!mod) return; + mod.startReplayRecorder(this.options.sessionReplay!, (chunk) => { + this.send({ + type: 'replay', + payload: { + ...chunk, + }, + }); + }); + }); + } + } + } + } + + /** + * Load the replay recorder module. + * + * - **IIFE build (op1.js)**: `__OPENPANEL_REPLAY_URL__` is replaced at + * build time with a CDN URL (e.g. `https://openpanel.dev/op1-replay.js`). + * The user can also override it via `sessionReplay.scriptUrl`. + * We load the IIFE replay script via a classic `