diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 488e6713..d73f267e 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,10 +1,7 @@ import type { WebSocket } from '@fastify/websocket'; import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { - psubscribeToPublishedEvent, - subscribeToPublishedEvent, -} from '@openpanel/redis'; +import { subscribeToPublishedEvent } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; import type { FastifyRequest } from 'fastify'; @@ -39,19 +36,8 @@ export function wsVisitors( } ); - const punsubscribe = psubscribeToPublishedEvent( - '__keyevent@0__:expired', - (key) => { - const [, , projectId] = key.split(':'); - if (projectId === params.projectId) { - sendCount(); - } - } - ); - socket.on('close', () => { unsubscribe(); - punsubscribe(); }); } diff --git a/apps/start/src/components/overview/live-counter.tsx b/apps/start/src/components/overview/live-counter.tsx index 2333f5c0..c3d5e733 100644 --- a/apps/start/src/components/overview/live-counter.tsx +++ b/apps/start/src/components/overview/live-counter.tsx @@ -1,61 +1,25 @@ -import { TooltipComplete } from '@/components/tooltip-complete'; -import { useDebounceState } from '@/hooks/use-debounce-state'; -import useWS from '@/hooks/use-ws'; -import { useTRPC } from '@/integrations/trpc/react'; -import { cn } from '@/utils/cn'; -import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { useEffect, useRef } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { useCallback } from 'react'; import { toast } from 'sonner'; import { AnimatedNumber } from '../animated-number'; +import { TooltipComplete } from '@/components/tooltip-complete'; +import { useLiveCounter } from '@/hooks/use-live-counter'; +import { cn } from '@/utils/cn'; export interface LiveCounterProps { projectId: string; shareId?: string; } -const FIFTEEN_SECONDS = 1000 * 30; - export function LiveCounter({ projectId, shareId }: LiveCounterProps) { - const trpc = useTRPC(); const client = useQueryClient(); - const counter = useDebounceState(0, 1000); - const lastRefresh = useRef(Date.now()); - const query = useQuery( - trpc.overview.liveVisitors.queryOptions({ - projectId, - shareId, - }), - ); - - useEffect(() => { - if (query.data) { - counter.set(query.data); - } - }, [query.data]); - - useWS( - `/live/visitors/${projectId}`, - (value) => { - if (!Number.isNaN(value)) { - counter.set(value); - if (Date.now() - lastRefresh.current > FIFTEEN_SECONDS) { - lastRefresh.current = Date.now(); - if (!document.hidden) { - toast('Refreshed data'); - client.refetchQueries({ - type: 'active', - }); - } - } - } - }, - { - debounce: { - delay: 1000, - maxWait: 5000, - }, - }, - ); + const onRefresh = useCallback(() => { + toast('Refreshed data'); + client.refetchQueries({ + type: 'active', + }); + }, [client]); + const counter = useLiveCounter({ projectId, shareId, onRefresh }); return (
diff --git a/apps/start/src/hooks/use-live-counter.ts b/apps/start/src/hooks/use-live-counter.ts new file mode 100644 index 00000000..d8e0d7ed --- /dev/null +++ b/apps/start/src/hooks/use-live-counter.ts @@ -0,0 +1,81 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query'; +import { useEffect, useRef } from 'react'; +import { useDebounceState } from './use-debounce-state'; +import useWS from './use-ws'; +import { useTRPC } from '@/integrations/trpc/react'; + +const FIFTEEN_SECONDS = 1000 * 15; +/** Refetch from API when WS-only updates may be stale (e.g. visitors left). */ +const FALLBACK_STALE_MS = 1000 * 60; + +export function useLiveCounter({ + projectId, + shareId, + onRefresh, +}: { + projectId: string; + shareId?: string; + onRefresh?: () => void; +}) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + const counter = useDebounceState(0, 1000); + const lastRefresh = useRef(Date.now()); + const query = useQuery( + trpc.overview.liveVisitors.queryOptions({ + projectId, + shareId: shareId ?? undefined, + }) + ); + + useEffect(() => { + if (query.data) { + counter.set(query.data); + } + }, [query.data]); + + useWS( + `/live/visitors/${projectId}`, + (value) => { + if (!Number.isNaN(value)) { + counter.set(value); + if (Date.now() - lastRefresh.current > FIFTEEN_SECONDS) { + lastRefresh.current = Date.now(); + if (!document.hidden) { + onRefresh?.(); + } + } + } + }, + { + debounce: { + delay: 1000, + maxWait: 5000, + }, + } + ); + + useEffect(() => { + const id = setInterval(async () => { + if (Date.now() - lastRefresh.current < FALLBACK_STALE_MS) { + return; + } + const data = await queryClient.fetchQuery( + trpc.overview.liveVisitors.queryOptions( + { + projectId, + shareId: shareId ?? undefined, + }, + // Default query staleTime is 5m; bypass cache so this reconciliation always hits the API. + { staleTime: 0 } + ) + ); + counter.set(data); + lastRefresh.current = Date.now(); + }, FALLBACK_STALE_MS); + + return () => clearInterval(id); + }, [projectId, shareId, trpc, queryClient, counter.set]); + + return counter; +} diff --git a/apps/start/src/routes/widget/counter.tsx b/apps/start/src/routes/widget/counter.tsx index a55e7c81..d745aa62 100644 --- a/apps/start/src/routes/widget/counter.tsx +++ b/apps/start/src/routes/widget/counter.tsx @@ -1,12 +1,11 @@ -import { AnimatedNumber } from '@/components/animated-number'; -import { Ping } from '@/components/ping'; -import { useNumber } from '@/hooks/use-numer-formatter'; -import useWS from '@/hooks/use-ws'; -import { useTRPC } from '@/integrations/trpc/react'; -import type { RouterOutputs } from '@/trpc/client'; import { useQuery, useQueryClient } from '@tanstack/react-query'; import { createFileRoute } from '@tanstack/react-router'; import { z } from 'zod'; +import { AnimatedNumber } from '@/components/animated-number'; +import { Ping } from '@/components/ping'; +import useWS from '@/hooks/use-ws'; +import { useTRPC } from '@/integrations/trpc/react'; +import type { RouterOutputs } from '@/trpc/client'; const widgetSearchSchema = z.object({ shareId: z.string(), @@ -20,33 +19,33 @@ export const Route = createFileRoute('/widget/counter')({ }); function RouteComponent() { - const { shareId, limit, color } = Route.useSearch(); + const { shareId } = Route.useSearch(); const trpc = useTRPC(); // Fetch widget data const { data, isLoading } = useQuery( - trpc.widget.counter.queryOptions({ shareId }), + trpc.widget.counter.queryOptions({ shareId }) ); if (isLoading) { return ( -
+
- +
); } if (!data) { return ( -
+
- +
); } - return ; + return ; } interface RealtimeWidgetProps { @@ -57,30 +56,29 @@ interface RealtimeWidgetProps { function CounterWidget({ shareId, data }: RealtimeWidgetProps) { const trpc = useTRPC(); const queryClient = useQueryClient(); - const number = useNumber(); // WebSocket subscription for real-time updates useWS( `/live/visitors/${data.projectId}`, - (res) => { + () => { if (!document.hidden) { queryClient.refetchQueries( - trpc.widget.counter.queryFilter({ shareId }), + trpc.widget.counter.queryFilter({ shareId }) ); } }, { debounce: { delay: 1000, - maxWait: 60000, + maxWait: 60_000, }, - }, + } ); return ( -
+
- +
); } diff --git a/apps/start/src/routes/widget/realtime.tsx b/apps/start/src/routes/widget/realtime.tsx index a60a17a8..8e1a4cdb 100644 --- a/apps/start/src/routes/widget/realtime.tsx +++ b/apps/start/src/routes/widget/realtime.tsx @@ -1,3 +1,15 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query'; +import { createFileRoute } from '@tanstack/react-router'; +import type React from 'react'; +import { + Bar, + BarChart, + ResponsiveContainer, + Tooltip, + XAxis, + YAxis, +} from 'recharts'; +import { z } from 'zod'; import { AnimatedNumber } from '@/components/animated-number'; import { ChartTooltipContainer, @@ -14,18 +26,6 @@ import { countries } from '@/translations/countries'; import type { RouterOutputs } from '@/trpc/client'; import { cn } from '@/utils/cn'; import { getChartColor } from '@/utils/theme'; -import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { createFileRoute } from '@tanstack/react-router'; -import type React from 'react'; -import { - Bar, - BarChart, - ResponsiveContainer, - Tooltip, - XAxis, - YAxis, -} from 'recharts'; -import { z } from 'zod'; const widgetSearchSchema = z.object({ shareId: z.string(), @@ -44,7 +44,7 @@ function RouteComponent() { // Fetch widget data const { data: widgetData, isLoading } = useQuery( - trpc.widget.realtimeData.queryOptions({ shareId }), + trpc.widget.realtimeData.queryOptions({ shareId }) ); if (isLoading) { @@ -53,10 +53,10 @@ function RouteComponent() { if (!widgetData) { return ( -
- -

Widget not found

-

+

+ +

Widget not found

+

This widget is not available or has been removed.

@@ -65,10 +65,10 @@ function RouteComponent() { return ( ); } @@ -83,7 +83,6 @@ interface RealtimeWidgetProps { function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { const trpc = useTRPC(); const queryClient = useQueryClient(); - const number = useNumber(); // WebSocket subscription for real-time updates useWS( @@ -91,16 +90,16 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { () => { if (!document.hidden) { queryClient.refetchQueries( - trpc.widget.realtimeData.queryFilter({ shareId }), + trpc.widget.realtimeData.queryFilter({ shareId }) ); } }, { debounce: { delay: 1000, - maxWait: 60000, + maxWait: 60_000, }, - }, + } ); const maxDomain = @@ -111,8 +110,12 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { const referrers = data.referrers.length > 0 ? 1 : 0; const paths = data.paths.length > 0 ? 1 : 0; const value = countries + referrers + paths; - if (value === 3) return 'md:grid-cols-3'; - if (value === 2) return 'md:grid-cols-2'; + if (value === 3) { + return 'md:grid-cols-3'; + } + if (value === 2) { + return 'md:grid-cols-2'; + } return 'md:grid-cols-1'; })(); @@ -120,10 +123,10 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) {
{/* Header with live counter */}
-
-
+
+
-
+
USERS IN LAST 30 MINUTES
{data.project.domain && } @@ -131,14 +134,14 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) {
-
+
-
+
- + - + @@ -174,24 +177,24 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {(data.countries.length > 0 || data.referrers.length > 0 || data.paths.length > 0) && ( -
+
{/* Countries */} {data.countries.length > 0 && (
-
+
COUNTRY
{(() => { const { visible, rest, restCount } = getRestItems( data.countries, - limit, + limit ); return ( <> {visible.map((item) => ( - +
@@ -224,19 +227,19 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {/* Referrers */} {data.referrers.length > 0 && (
-
+
REFERRER
{(() => { const { visible, rest, restCount } = getRestItems( data.referrers, - limit, + limit ); return ( <> {visible.map((item) => ( - +
@@ -263,19 +266,19 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {/* Paths */} {data.paths.length > 0 && (
-
+
PATH
{(() => { const { visible, rest, restCount } = getRestItems( data.paths, - limit, + limit ); return ( <> {visible.map((item) => ( - + {item.path} @@ -303,10 +306,10 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { } // Custom tooltip component that uses portals to escape overflow hidden -const CustomTooltip = ({ active, payload, coordinate }: any) => { +const CustomTooltip = ({ active, payload }: any) => { const number = useNumber(); - if (!active || !payload || !payload.length) { + if (!(active && payload && payload.length)) { return null; } @@ -328,10 +331,13 @@ const CustomTooltip = ({ active, payload, coordinate }: any) => { function RowItem({ children, count, -}: { children: React.ReactNode; count: number }) { +}: { + children: React.ReactNode; + count: number; +}) { const number = useNumber(); return ( -
+
{children} {number.short(count)}
@@ -340,7 +346,7 @@ function RowItem({ function getRestItems( items: T[], - limit: number, + limit: number ): { visible: T[]; rest: T[]; restCount: number } { const visible = items.slice(0, limit); const rest = items.slice(limit); @@ -375,7 +381,7 @@ function RestRow({ : 'paths'; return ( -
+
{firstName} and {otherCount} more {typeLabel}... @@ -434,13 +440,13 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { const itemCount = Math.min(limit, 5); return ( -
+
{/* Header with live counter */}
-
-
+
+
-
+
USERS IN LAST 30 MINUTES
@@ -448,35 +454,35 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) {
-
-
-
+
+
+
-
-
+
+
{SKELETON_HISTOGRAM.map((item, index) => (
))}
-
-
+
+
-
+
{/* Countries, Referrers, and Paths skeleton */}
{/* Countries skeleton */}
-
+
COUNTRY
@@ -488,7 +494,7 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { {/* Referrers skeleton */}
-
+
REFERRER
@@ -500,7 +506,7 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { {/* Paths skeleton */}
-
+
PATH
@@ -517,12 +523,12 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { function RowItemSkeleton() { return ( -
+
-
+
-
+
); } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index be44688f..e269ef0b 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -1,6 +1,7 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { ch } from '../clickhouse/client'; +import * as chClient from '../clickhouse/client'; +const { ch } = chClient; // Break circular dep: event-buffer -> event.service -> buffers/index -> EventBuffer vi.mock('../services/event.service', () => ({})); @@ -10,10 +11,7 @@ import { EventBuffer } from './event-buffer'; const redis = getRedisCache(); beforeEach(async () => { - const keys = [ - ...await redis.keys('event*'), - ...await redis.keys('live:*'), - ]; + const keys = await redis.keys('event*'); if (keys.length > 0) await redis.del(...keys); }); @@ -213,18 +211,16 @@ describe('EventBuffer', () => { }); it('tracks active visitors', async () => { - const event = { - project_id: 'p9', - profile_id: 'u9', - name: 'custom', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(event); - await eventBuffer.flush(); + const querySpy = vi + .spyOn(chClient, 'chQuery') + .mockResolvedValueOnce([{ count: 2 }] as any); const count = await eventBuffer.getActiveVisitorCount('p9'); - expect(count).toBeGreaterThanOrEqual(1); + expect(count).toBe(2); + expect(querySpy).toHaveBeenCalledOnce(); + expect(querySpy.mock.calls[0]![0]).toContain("project_id = 'p9'"); + + querySpy.mockRestore(); }); it('handles multiple sessions independently — all events go to buffer', async () => { diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 2e5d43dc..883812ad 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -1,6 +1,6 @@ import { getSafeJson } from '@openpanel/json'; -import { getRedisCache, publishEvent, type Redis } from '@openpanel/redis'; -import { ch } from '../clickhouse/client'; +import { getRedisCache, publishEvent } from '@openpanel/redis'; +import { ch, chQuery } from '../clickhouse/client'; import type { IClickhouseEvent } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; @@ -25,10 +25,6 @@ export class EventBuffer extends BaseBuffer { /** Tracks consecutive flush failures for observability; reset on success. */ private flushRetryCount = 0; - private activeVisitorsExpiration = 60 * 5; // 5 minutes - /** How often (ms) we refresh the heartbeat key + zadd per visitor. */ - private heartbeatRefreshMs = 60_000; // 1 minute - private lastHeartbeat = new Map(); private queueKey = 'event_buffer:queue'; protected bufferCounterKey = 'event_buffer:total_count'; @@ -87,20 +83,12 @@ export class EventBuffer extends BaseBuffer { for (const event of eventsToFlush) { multi.rpush(this.queueKey, JSON.stringify(event)); - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id - ); - } } multi.incrby(this.bufferCounterKey, eventsToFlush.length); await multi.exec(); this.flushRetryCount = 0; - this.pruneHeartbeatMap(); } catch (error) { // Re-queue failed events at the front to preserve order and avoid data loss this.pendingEvents = eventsToFlush.concat(this.pendingEvents); @@ -202,58 +190,21 @@ export class EventBuffer extends BaseBuffer { } } - public async getBufferSize() { + public getBufferSize() { return this.getBufferSizeWithCounter(async () => { const redis = getRedisCache(); return await redis.llen(this.queueKey); }); } - private pruneHeartbeatMap() { - const cutoff = Date.now() - this.activeVisitorsExpiration * 1000; - for (const [key, ts] of this.lastHeartbeat) { - if (ts < cutoff) { - this.lastHeartbeat.delete(key); - } - } - } - - private incrementActiveVisitorCount( - multi: ReturnType, - projectId: string, - profileId: string - ) { - const key = `${projectId}:${profileId}`; - const now = Date.now(); - const last = this.lastHeartbeat.get(key) ?? 0; - - if (now - last < this.heartbeatRefreshMs) { - return; - } - - this.lastHeartbeat.set(key, now); - const zsetKey = `live:visitors:${projectId}`; - const heartbeatKey = `live:visitor:${projectId}:${profileId}`; - multi - .zadd(zsetKey, now, profileId) - .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); - } - public async getActiveVisitorCount(projectId: string): Promise { - const redis = getRedisCache(); - const zsetKey = `live:visitors:${projectId}`; - const cutoff = Date.now() - this.activeVisitorsExpiration * 1000; - - const multi = redis.multi(); - multi - .zremrangebyscore(zsetKey, '-inf', cutoff) - .zcount(zsetKey, cutoff, '+inf'); - - const [, count] = (await multi.exec()) as [ - [Error | null, any], - [Error | null, number], - ]; - - return count[1] || 0; + const rows = await chQuery<{ count: number }>( + `SELECT uniq(profile_id) AS count + FROM events + WHERE project_id = '${projectId}' + AND profile_id != '' + AND created_at >= now() - INTERVAL 5 MINUTE` + ); + return rows[0]?.count ?? 0; } }