diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 76d4f993..eaf729e9 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -6,8 +6,8 @@ import { getSuperJson } from '@openpanel/common'; import type { IServiceEvent, Notification } from '@openpanel/db'; import { TABLE_NAMES, + eventBuffer, getEvents, - getLiveVisitors, getProfileByIdCached, transformMinimalEvent, } from '@openpanel/db'; @@ -82,20 +82,20 @@ export function wsVisitors( if (channel === 'event:received') { const event = getSuperJson(message); if (event?.projectId === params.projectId) { - getLiveVisitors(params.projectId).then((count) => { + eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { connection.socket.send(String(count)); }); } } }; const pmessage = (pattern: string, channel: string, message: string) => { - if (!message.startsWith('live:visitors:')) { + if (!message.startsWith('live:visitor:')) { return null; } const [projectId] = getLiveEventInfo(message); if (projectId && projectId === params.projectId) { - getLiveVisitors(params.projectId).then((count) => { + eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { connection.socket.send(String(count)); }); } diff --git a/apps/dashboard/src/components/overview/live-counter/index.tsx b/apps/dashboard/src/components/overview/live-counter/index.tsx index 72d43b64..4ee4473c 100644 --- a/apps/dashboard/src/components/overview/live-counter/index.tsx +++ b/apps/dashboard/src/components/overview/live-counter/index.tsx @@ -1,12 +1,12 @@ import withSuspense from '@/hocs/with-suspense'; -import { getLiveVisitors } from '@openpanel/db'; +import { eventBuffer } from '@openpanel/db'; import type { LiveCounterProps } from './live-counter'; import LiveCounter from './live-counter'; async function ServerLiveCounter(props: Omit) { - const count = await getLiveVisitors(props.projectId); + const count = await eventBuffer.getActiveVisitorCount(props.projectId); return ; } diff --git a/apps/dashboard/src/hooks/useWS.ts b/apps/dashboard/src/hooks/useWS.ts index e5b8cfb7..4179b427 100644 --- a/apps/dashboard/src/hooks/useWS.ts +++ b/apps/dashboard/src/hooks/useWS.ts @@ -39,7 +39,7 @@ export default function useWS( onMessage(event) { try { const data = getSuperJson(event.data); - if (data) { + if (data !== null && data !== undefined) { debouncedOnMessage(data); } } catch (error) { diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index 122b3bf8..590c352e 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -54,6 +54,8 @@ export class EventBuffer extends BaseBuffer { ) : 1000; + private activeVisitorsExpiration = 60 * 5; // 5 minutes + private sessionEvents = ['screen_view', 'session_end']; // LIST - Stores events without sessions @@ -246,8 +248,11 @@ return "OK" } if (event.profile_id) { - multi.sadd(`live:visitors:${event.project_id}`, event.profile_id); - multi.expire(`live:visitors:${event.project_id}`, 60 * 5); // 5 minutes + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); } if (!_multi) { @@ -689,4 +694,44 @@ return "OK" ); return count; } + + private async incrementActiveVisitorCount( + multi: ReturnType, + projectId: string, + profileId: string, + ) { + // Add/update visitor with current timestamp as score + const now = Date.now(); + const zsetKey = `live:visitors:${projectId}`; + return ( + multi + // To keep the count + .zadd(zsetKey, now, profileId) + // To trigger the expiration listener + .set( + `live:visitor:${projectId}:${profileId}`, + '1', + 'EX', + this.activeVisitorsExpiration, + ) + ); + } + + public async getActiveVisitorCount(projectId: string): Promise { + const redis = getRedisCache(); + const zsetKey = `live:visitors:${projectId}`; + const cutoff = Date.now() - this.activeVisitorsExpiration * 1000; + + const multi = redis.multi(); + multi + .zremrangebyscore(zsetKey, '-inf', cutoff) + .zcount(zsetKey, cutoff, '+inf'); + + const [, count] = (await multi.exec()) as [ + [Error | null, any], + [Error | null, number], + ]; + + return count[1] || 0; + } } diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 88f30de7..2c546daa 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,8 +1,5 @@ -import { BotBuffer as BotBufferPsql } from './bot-buffer-psql'; import { BotBuffer as BotBufferRedis } from './bot-buffer-redis'; -import { EventBuffer as EventBufferPsql } from './event-buffer-psql'; import { EventBuffer as EventBufferRedis } from './event-buffer-redis'; -import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis'; export const eventBuffer = new EventBufferRedis(); diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index d2d78497..0554b8b9 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -1,9 +1,9 @@ -import { mergeDeepRight, omit, uniq } from 'ramda'; +import { mergeDeepRight, uniq } from 'ramda'; import { escape } from 'sqlstring'; import { v4 as uuid } from 'uuid'; import { toDots } from '@openpanel/common'; -import { cacheable, getRedisCache } from '@openpanel/redis'; +import { cacheable } from '@openpanel/redis'; import type { IChartEventFilter } from '@openpanel/validation'; import { botBuffer, eventBuffer } from '../buffers'; @@ -234,10 +234,6 @@ export function transformMinimalEvent( }; } -export async function getLiveVisitors(projectId: string) { - return getRedisCache().scard(`live:visitors:${projectId}`); -} - export async function getEvents( sql: string, options: GetEventsOptions = {},