From a672b73947f875560dd7961a7c86318e0a135c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 11 Mar 2026 23:28:20 +0100 Subject: [PATCH] wip --- apps/api/src/controllers/live.controller.ts | 96 +++++++++--------- .../src/components/events/event-listener.tsx | 37 ++----- .../src/components/events/table/index.tsx | 9 +- .../onboarding/onboarding-verify-listener.tsx | 32 ++---- .../realtime/realtime-active-sessions.tsx | 98 ++++++++----------- ...ationId.$projectId.events._tabs.events.tsx | 2 +- ...pp.$organizationId.$projectId.realtime.tsx | 23 +++-- .../_steps.onboarding.$projectId.verify.tsx | 28 ++---- packages/db/src/buffers/event-buffer.ts | 96 +++++++----------- packages/redis/publisher.ts | 3 +- packages/trpc/src/routers/realtime.ts | 50 ++++------ 11 files changed, 196 insertions(+), 278 deletions(-) diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index cd7afe91..332968ad 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,16 +1,13 @@ -import type { FastifyRequest } from 'fastify'; -import superjson from 'superjson'; - import type { WebSocket } from '@fastify/websocket'; -import { - eventBuffer, - getProfileById, - transformMinimalEvent, -} from '@openpanel/db'; +import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { subscribeToPublishedEvent } from '@openpanel/redis'; +import { + psubscribeToPublishedEvent, + subscribeToPublishedEvent, +} from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; +import type { FastifyRequest } from 'fastify'; export function wsVisitors( socket: WebSocket, @@ -18,19 +15,38 @@ export function wsVisitors( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; - const unsubscribe = subscribeToPublishedEvent('events', 'saved', (event) => { - if (event?.projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); + const sendCount = () => { + eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { + socket.send(String(count)); + }); + }; + + const unsubscribe = subscribeToPublishedEvent( + 'events', + 'batch', + ({ projectId }) => { + if (projectId === params.projectId) { + sendCount(); + } } - }); + ); + + const punsubscribe = psubscribeToPublishedEvent( + '__keyevent@0__:expired', + (key) => { + const [, , projectId] = key.split(':'); + if (projectId === params.projectId) { + sendCount(); + } + } + ); socket.on('close', () => { unsubscribe(); + punsubscribe(); }); } @@ -42,18 +58,10 @@ export async function wsProjectEvents( }; Querystring: { token?: string; - type?: 'saved' | 'received'; }; - }>, + }> ) { - const { params, query } = req; - const type = query.type || 'saved'; - - if (!['saved', 'received'].includes(type)) { - socket.send('Invalid type'); - socket.close(); - return; - } + const { params } = req; const userId = req.session?.userId; if (!userId) { @@ -67,24 +75,20 @@ export async function wsProjectEvents( projectId: params.projectId, }); + if (!access) { + socket.send('No access'); + socket.close(); + return; + } + const unsubscribe = subscribeToPublishedEvent( 'events', - type, - async (event) => { - if (event.projectId === params.projectId) { - const profile = await getProfileById(event.profileId, event.projectId); - socket.send( - superjson.stringify( - access - ? { - ...event, - profile, - } - : transformMinimalEvent(event), - ), - ); + 'batch', + ({ projectId, count }) => { + if (projectId === params.projectId) { + socket.send(setSuperJson({ count })); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -96,7 +100,7 @@ export async function wsProjectNotifications( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -123,9 +127,9 @@ export async function wsProjectNotifications( 'created', (notification) => { if (notification.projectId === params.projectId) { - socket.send(superjson.stringify(notification)); + socket.send(setSuperJson(notification)); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -137,7 +141,7 @@ export async function wsOrganizationEvents( Params: { organizationId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -164,7 +168,7 @@ export async function wsOrganizationEvents( 'subscription_updated', (message) => { socket.send(setSuperJson(message)); - }, + } ); socket.on('close', () => unsubscribe()); diff --git a/apps/start/src/components/events/event-listener.tsx b/apps/start/src/components/events/event-listener.tsx index defabb7d..08df9b61 100644 --- a/apps/start/src/components/events/event-listener.tsx +++ b/apps/start/src/components/events/event-listener.tsx @@ -1,3 +1,4 @@ +import { AnimatedNumber } from '../animated-number'; import { Tooltip, TooltipContent, @@ -8,71 +9,53 @@ import { useDebounceState } from '@/hooks/use-debounce-state'; import useWS from '@/hooks/use-ws'; import { cn } from '@/utils/cn'; -import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; -import { useParams } from '@tanstack/react-router'; -import { AnimatedNumber } from '../animated-number'; - export default function EventListener({ onRefresh, }: { onRefresh: () => void; }) { - const params = useParams({ - strict: false, - }); const { projectId } = useAppParams(); const counter = useDebounceState(0, 1000); - useWS( + useWS<{ count: number }>( `/live/events/${projectId}`, - (event) => { - if (event) { - const isProfilePage = !!params?.profileId; - if (isProfilePage) { - const profile = 'profile' in event ? event.profile : null; - if (profile?.id === params?.profileId) { - counter.set((prev) => prev + 1); - } - return; - } - - counter.set((prev) => prev + 1); - } + ({ count }) => { + counter.set((prev) => prev + count); }, { debounce: { delay: 1000, maxWait: 5000, }, - }, + } ); return ( diff --git a/apps/start/src/components/events/table/index.tsx b/apps/start/src/components/events/table/index.tsx index 30071399..95ed8e2d 100644 --- a/apps/start/src/components/events/table/index.tsx +++ b/apps/start/src/components/events/table/index.tsx @@ -35,6 +35,7 @@ type Props = { >, unknown >; + showEventListener?: boolean; }; const LOADING_DATA = [{}, {}, {}, {}, {}, {}, {}, {}, {}] as IServiceEvent[]; @@ -215,7 +216,7 @@ const VirtualizedEventsTable = ({ ); }; -export const EventsTable = ({ query }: Props) => { +export const EventsTable = ({ query, showEventListener = false }: Props) => { const { isLoading } = query; const columns = useColumns(); @@ -272,7 +273,7 @@ export const EventsTable = ({ query }: Props) => { return ( <> - +
{ function EventsTableToolbar({ query, table, + showEventListener, }: { query: Props['query']; table: Table; + showEventListener: boolean; }) { const { projectId } = useAppParams(); const [startDate, setStartDate] = useQueryState( @@ -305,7 +308,7 @@ function EventsTableToolbar({ return (
- query.refetch()} /> + {showEventListener && query.refetch()} />}