fix(buffer): live counter

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-11 21:38:26 +01:00
parent 848f475412
commit 3205ea0a31
6 changed files with 56 additions and 18 deletions

View File

@@ -6,8 +6,8 @@ import { getSuperJson } from '@openpanel/common';
import type { IServiceEvent, Notification } from '@openpanel/db';
import {
TABLE_NAMES,
eventBuffer,
getEvents,
getLiveVisitors,
getProfileByIdCached,
transformMinimalEvent,
} from '@openpanel/db';
@@ -82,20 +82,20 @@ export function wsVisitors(
if (channel === 'event:received') {
const event = getSuperJson<IServiceEvent>(message);
if (event?.projectId === params.projectId) {
getLiveVisitors(params.projectId).then((count) => {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
connection.socket.send(String(count));
});
}
}
};
const pmessage = (pattern: string, channel: string, message: string) => {
if (!message.startsWith('live:visitors:')) {
if (!message.startsWith('live:visitor:')) {
return null;
}
const [projectId] = getLiveEventInfo(message);
if (projectId && projectId === params.projectId) {
getLiveVisitors(params.projectId).then((count) => {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
connection.socket.send(String(count));
});
}

View File

@@ -1,12 +1,12 @@
import withSuspense from '@/hocs/with-suspense';
import { getLiveVisitors } from '@openpanel/db';
import { eventBuffer } from '@openpanel/db';
import type { LiveCounterProps } from './live-counter';
import LiveCounter from './live-counter';
async function ServerLiveCounter(props: Omit<LiveCounterProps, 'data'>) {
const count = await getLiveVisitors(props.projectId);
const count = await eventBuffer.getActiveVisitorCount(props.projectId);
return <LiveCounter data={count} {...props} />;
}

View File

@@ -39,7 +39,7 @@ export default function useWS<T>(
onMessage(event) {
try {
const data = getSuperJson<T>(event.data);
if (data) {
if (data !== null && data !== undefined) {
debouncedOnMessage(data);
}
} catch (error) {

View File

@@ -54,6 +54,8 @@ export class EventBuffer extends BaseBuffer {
)
: 1000;
private activeVisitorsExpiration = 60 * 5; // 5 minutes
private sessionEvents = ['screen_view', 'session_end'];
// LIST - Stores events without sessions
@@ -246,8 +248,11 @@ return "OK"
}
if (event.profile_id) {
multi.sadd(`live:visitors:${event.project_id}`, event.profile_id);
multi.expire(`live:visitors:${event.project_id}`, 60 * 5); // 5 minutes
this.incrementActiveVisitorCount(
multi,
event.project_id,
event.profile_id,
);
}
if (!_multi) {
@@ -689,4 +694,44 @@ return "OK"
);
return count;
}
private async incrementActiveVisitorCount(
multi: ReturnType<Redis['multi']>,
projectId: string,
profileId: string,
) {
// Add/update visitor with current timestamp as score
const now = Date.now();
const zsetKey = `live:visitors:${projectId}`;
return (
multi
// To keep the count
.zadd(zsetKey, now, profileId)
// To trigger the expiration listener
.set(
`live:visitor:${projectId}:${profileId}`,
'1',
'EX',
this.activeVisitorsExpiration,
)
);
}
public async getActiveVisitorCount(projectId: string): Promise<number> {
const redis = getRedisCache();
const zsetKey = `live:visitors:${projectId}`;
const cutoff = Date.now() - this.activeVisitorsExpiration * 1000;
const multi = redis.multi();
multi
.zremrangebyscore(zsetKey, '-inf', cutoff)
.zcount(zsetKey, cutoff, '+inf');
const [, count] = (await multi.exec()) as [
[Error | null, any],
[Error | null, number],
];
return count[1] || 0;
}
}

View File

@@ -1,8 +1,5 @@
import { BotBuffer as BotBufferPsql } from './bot-buffer-psql';
import { BotBuffer as BotBufferRedis } from './bot-buffer-redis';
import { EventBuffer as EventBufferPsql } from './event-buffer-psql';
import { EventBuffer as EventBufferRedis } from './event-buffer-redis';
import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql';
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis';
export const eventBuffer = new EventBufferRedis();

View File

@@ -1,9 +1,9 @@
import { mergeDeepRight, omit, uniq } from 'ramda';
import { mergeDeepRight, uniq } from 'ramda';
import { escape } from 'sqlstring';
import { v4 as uuid } from 'uuid';
import { toDots } from '@openpanel/common';
import { cacheable, getRedisCache } from '@openpanel/redis';
import { cacheable } from '@openpanel/redis';
import type { IChartEventFilter } from '@openpanel/validation';
import { botBuffer, eventBuffer } from '../buffers';
@@ -234,10 +234,6 @@ export function transformMinimalEvent(
};
}
export async function getLiveVisitors(projectId: string) {
return getRedisCache().scard(`live:visitors:${projectId}`);
}
export async function getEvents(
sql: string,
options: GetEventsOptions = {},