remove active visitor counter in redis
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { ch } from '../clickhouse/client';
|
||||
import * as chClient from '../clickhouse/client';
|
||||
const { ch } = chClient;
|
||||
|
||||
// Break circular dep: event-buffer -> event.service -> buffers/index -> EventBuffer
|
||||
vi.mock('../services/event.service', () => ({}));
|
||||
@@ -10,10 +11,7 @@ import { EventBuffer } from './event-buffer';
|
||||
const redis = getRedisCache();
|
||||
|
||||
beforeEach(async () => {
|
||||
const keys = [
|
||||
...await redis.keys('event*'),
|
||||
...await redis.keys('live:*'),
|
||||
];
|
||||
const keys = await redis.keys('event*');
|
||||
if (keys.length > 0) await redis.del(...keys);
|
||||
});
|
||||
|
||||
@@ -213,18 +211,16 @@ describe('EventBuffer', () => {
|
||||
});
|
||||
|
||||
it('tracks active visitors', async () => {
|
||||
const event = {
|
||||
project_id: 'p9',
|
||||
profile_id: 'u9',
|
||||
name: 'custom',
|
||||
created_at: new Date().toISOString(),
|
||||
} as any;
|
||||
|
||||
eventBuffer.add(event);
|
||||
await eventBuffer.flush();
|
||||
const querySpy = vi
|
||||
.spyOn(chClient, 'chQuery')
|
||||
.mockResolvedValueOnce([{ count: 2 }] as any);
|
||||
|
||||
const count = await eventBuffer.getActiveVisitorCount('p9');
|
||||
expect(count).toBeGreaterThanOrEqual(1);
|
||||
expect(count).toBe(2);
|
||||
expect(querySpy).toHaveBeenCalledOnce();
|
||||
expect(querySpy.mock.calls[0]![0]).toContain("project_id = 'p9'");
|
||||
|
||||
querySpy.mockRestore();
|
||||
});
|
||||
|
||||
it('handles multiple sessions independently — all events go to buffer', async () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
import { getRedisCache, publishEvent, type Redis } from '@openpanel/redis';
|
||||
import { ch } from '../clickhouse/client';
|
||||
import { getRedisCache, publishEvent } from '@openpanel/redis';
|
||||
import { ch, chQuery } from '../clickhouse/client';
|
||||
import type { IClickhouseEvent } from '../services/event.service';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
@@ -25,10 +25,6 @@ export class EventBuffer extends BaseBuffer {
|
||||
/** Tracks consecutive flush failures for observability; reset on success. */
|
||||
private flushRetryCount = 0;
|
||||
|
||||
private activeVisitorsExpiration = 60 * 5; // 5 minutes
|
||||
/** How often (ms) we refresh the heartbeat key + zadd per visitor. */
|
||||
private heartbeatRefreshMs = 60_000; // 1 minute
|
||||
private lastHeartbeat = new Map<string, number>();
|
||||
private queueKey = 'event_buffer:queue';
|
||||
protected bufferCounterKey = 'event_buffer:total_count';
|
||||
|
||||
@@ -87,20 +83,12 @@ export class EventBuffer extends BaseBuffer {
|
||||
|
||||
for (const event of eventsToFlush) {
|
||||
multi.rpush(this.queueKey, JSON.stringify(event));
|
||||
if (event.profile_id) {
|
||||
this.incrementActiveVisitorCount(
|
||||
multi,
|
||||
event.project_id,
|
||||
event.profile_id
|
||||
);
|
||||
}
|
||||
}
|
||||
multi.incrby(this.bufferCounterKey, eventsToFlush.length);
|
||||
|
||||
await multi.exec();
|
||||
|
||||
this.flushRetryCount = 0;
|
||||
this.pruneHeartbeatMap();
|
||||
} catch (error) {
|
||||
// Re-queue failed events at the front to preserve order and avoid data loss
|
||||
this.pendingEvents = eventsToFlush.concat(this.pendingEvents);
|
||||
@@ -202,58 +190,21 @@ export class EventBuffer extends BaseBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
public async getBufferSize() {
|
||||
public getBufferSize() {
|
||||
return this.getBufferSizeWithCounter(async () => {
|
||||
const redis = getRedisCache();
|
||||
return await redis.llen(this.queueKey);
|
||||
});
|
||||
}
|
||||
|
||||
private pruneHeartbeatMap() {
|
||||
const cutoff = Date.now() - this.activeVisitorsExpiration * 1000;
|
||||
for (const [key, ts] of this.lastHeartbeat) {
|
||||
if (ts < cutoff) {
|
||||
this.lastHeartbeat.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private incrementActiveVisitorCount(
|
||||
multi: ReturnType<Redis['multi']>,
|
||||
projectId: string,
|
||||
profileId: string
|
||||
) {
|
||||
const key = `${projectId}:${profileId}`;
|
||||
const now = Date.now();
|
||||
const last = this.lastHeartbeat.get(key) ?? 0;
|
||||
|
||||
if (now - last < this.heartbeatRefreshMs) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastHeartbeat.set(key, now);
|
||||
const zsetKey = `live:visitors:${projectId}`;
|
||||
const heartbeatKey = `live:visitor:${projectId}:${profileId}`;
|
||||
multi
|
||||
.zadd(zsetKey, now, profileId)
|
||||
.set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration);
|
||||
}
|
||||
|
||||
public async getActiveVisitorCount(projectId: string): Promise<number> {
|
||||
const redis = getRedisCache();
|
||||
const zsetKey = `live:visitors:${projectId}`;
|
||||
const cutoff = Date.now() - this.activeVisitorsExpiration * 1000;
|
||||
|
||||
const multi = redis.multi();
|
||||
multi
|
||||
.zremrangebyscore(zsetKey, '-inf', cutoff)
|
||||
.zcount(zsetKey, cutoff, '+inf');
|
||||
|
||||
const [, count] = (await multi.exec()) as [
|
||||
[Error | null, any],
|
||||
[Error | null, number],
|
||||
];
|
||||
|
||||
return count[1] || 0;
|
||||
const rows = await chQuery<{ count: number }>(
|
||||
`SELECT uniq(profile_id) AS count
|
||||
FROM events
|
||||
WHERE project_id = '${projectId}'
|
||||
AND profile_id != ''
|
||||
AND created_at >= now() - INTERVAL 5 MINUTE`
|
||||
);
|
||||
return rows[0]?.count ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user