add buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-06 11:04:51 +01:00
parent 0b5d4fa0d1
commit 8fd8b9319d
6 changed files with 213 additions and 3 deletions

View File

@@ -3,8 +3,8 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import {
getProfileById,
getSalts,
groupBuffer,
replayBuffer,
upsertGroup,
upsertProfile,
} from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
@@ -344,7 +344,7 @@ async function handleGroup(
const profileId = payload.profileId ?? context.deviceId;
await Promise.all([
upsertGroup({
groupBuffer.add({
id,
projectId: context.projectId,
type,

View File

@@ -68,6 +68,11 @@ export async function bootCron() {
type: 'flushReplay',
pattern: 1000 * 10,
},
{
name: 'flush',
type: 'flushGroups',
pattern: 1000 * 10,
},
{
name: 'insightsDaily',
type: 'insightsDaily',

View File

@@ -1,6 +1,6 @@
import type { Job } from 'bullmq';
import { eventBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db';
import { eventBuffer, groupBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import { jobdeleteProjects } from './cron.delete-projects';
@@ -30,6 +30,9 @@ export async function cronJob(job: Job<CronQueuePayload>) {
case 'flushReplay': {
return await replayBuffer.tryFlush();
}
case 'flushGroups': {
return await groupBuffer.tryFlush();
}
case 'ping': {
return await ping();
}

View File

@@ -0,0 +1,195 @@
import { toDots } from '@openpanel/common';
import { getSafeJson } from '@openpanel/json';
import { getRedisCache, type Redis } from '@openpanel/redis';
import shallowEqual from 'fast-deep-equal';
import sqlstring from 'sqlstring';
import { ch, chQuery, formatClickhouseDate, TABLE_NAMES } from '../clickhouse/client';
import { BaseBuffer } from './base-buffer';
type IGroupBufferEntry = {
project_id: string;
id: string;
type: string;
name: string;
properties: Record<string, string>;
created_at: string;
version: string;
deleted: number;
};
type IGroupCacheEntry = {
id: string;
project_id: string;
type: string;
name: string;
properties: Record<string, string>;
created_at: string;
};
export type IGroupBufferInput = {
id: string;
projectId: string;
type: string;
name: string;
properties?: Record<string, unknown>;
};
export class GroupBuffer extends BaseBuffer {
private batchSize = process.env.GROUP_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.GROUP_BUFFER_BATCH_SIZE, 10)
: 200;
private chunkSize = process.env.GROUP_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.GROUP_BUFFER_CHUNK_SIZE, 10)
: 1000;
private ttlInSeconds = process.env.GROUP_BUFFER_TTL_IN_SECONDS
? Number.parseInt(process.env.GROUP_BUFFER_TTL_IN_SECONDS, 10)
: 60 * 60;
private readonly redisKey = 'group-buffer';
private readonly redisCachePrefix = 'group-cache:';
private redis: Redis;
constructor() {
super({
name: 'group',
onFlush: async () => {
await this.processBuffer();
},
});
this.redis = getRedisCache();
}
private getCacheKey(projectId: string, id: string) {
return `${this.redisCachePrefix}${projectId}:${id}`;
}
private async fetchFromCache(
projectId: string,
id: string
): Promise<IGroupCacheEntry | null> {
const raw = await this.redis.get(this.getCacheKey(projectId, id));
if (!raw) return null;
return getSafeJson<IGroupCacheEntry>(raw);
}
private async fetchFromClickhouse(
projectId: string,
id: string
): Promise<IGroupCacheEntry | null> {
const rows = await chQuery<IGroupCacheEntry>(`
SELECT project_id, id, type, name, properties, created_at
FROM ${TABLE_NAMES.groups} FINAL
WHERE project_id = ${sqlstring.escape(projectId)}
AND id = ${sqlstring.escape(id)}
AND deleted = 0
`);
return rows[0] ?? null;
}
async add(input: IGroupBufferInput): Promise<void> {
try {
const cacheKey = this.getCacheKey(input.projectId, input.id);
const existing =
(await this.fetchFromCache(input.projectId, input.id)) ??
(await this.fetchFromClickhouse(input.projectId, input.id));
const mergedProperties = toDots({
...(existing?.properties ?? {}),
...(input.properties ?? {}),
}) as Record<string, string>;
const entry: IGroupBufferEntry = {
project_id: input.projectId,
id: input.id,
type: input.type,
name: input.name,
properties: mergedProperties,
created_at: formatClickhouseDate(
existing?.created_at ? new Date(existing.created_at) : new Date()
),
version: String(Date.now()),
deleted: 0,
};
if (
existing &&
existing.type === entry.type &&
existing.name === entry.name &&
shallowEqual(existing.properties, entry.properties)
) {
this.logger.debug('Group not changed, skipping', { id: input.id });
return;
}
const cacheEntry: IGroupCacheEntry = {
id: entry.id,
project_id: entry.project_id,
type: entry.type,
name: entry.name,
properties: entry.properties,
created_at: entry.created_at,
};
const result = await this.redis
.multi()
.set(cacheKey, JSON.stringify(cacheEntry), 'EX', this.ttlInSeconds)
.rpush(this.redisKey, JSON.stringify(entry))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
.exec();
if (!result) {
this.logger.error('Failed to add group to Redis', { input });
return;
}
const bufferLength = (result?.[3]?.[1] as number) ?? 0;
if (bufferLength >= this.batchSize) {
await this.tryFlush();
}
} catch (error) {
this.logger.error('Failed to add group', { error, input });
}
}
async processBuffer(): Promise<void> {
try {
this.logger.debug('Starting group buffer processing');
const items = await this.redis.lrange(this.redisKey, 0, this.batchSize - 1);
if (items.length === 0) {
this.logger.debug('No groups to process');
return;
}
this.logger.debug(`Processing ${items.length} groups in buffer`);
const parsed = items.map((i) => getSafeJson<IGroupBufferEntry>(i));
for (const chunk of this.chunks(parsed, this.chunkSize)) {
await ch.insert({
table: TABLE_NAMES.groups,
values: chunk,
format: 'JSONEachRow',
});
}
await this.redis
.multi()
.ltrim(this.redisKey, items.length, -1)
.decrby(this.bufferCounterKey, items.length)
.exec();
this.logger.debug('Successfully completed group processing', {
totalGroups: items.length,
});
} catch (error) {
this.logger.error('Failed to process buffer', { error });
}
}
async getBufferSize(): Promise<number> {
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
}
}

View File

@@ -1,5 +1,6 @@
import { BotBuffer as BotBufferRedis } from './bot-buffer';
import { EventBuffer as EventBufferRedis } from './event-buffer';
import { GroupBuffer } from './group-buffer';
import { ProfileBackfillBuffer } from './profile-backfill-buffer';
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer';
import { ReplayBuffer } from './replay-buffer';
@@ -11,6 +12,7 @@ export const botBuffer = new BotBufferRedis();
export const sessionBuffer = new SessionBuffer();
export const profileBackfillBuffer = new ProfileBackfillBuffer();
export const replayBuffer = new ReplayBuffer();
export const groupBuffer = new GroupBuffer();
export type { ProfileBackfillEntry } from './profile-backfill-buffer';
export type { IClickhouseSessionReplayChunk } from './replay-buffer';

View File

@@ -134,6 +134,10 @@ export type CronQueuePayloadGscSync = {
type: 'gscSync';
payload: undefined;
};
export type CronQueuePayloadFlushGroups = {
type: 'flushGroups';
payload: undefined;
};
export type CronQueuePayload =
| CronQueuePayloadSalt
| CronQueuePayloadFlushEvents
@@ -141,6 +145,7 @@ export type CronQueuePayload =
| CronQueuePayloadFlushProfiles
| CronQueuePayloadFlushProfileBackfill
| CronQueuePayloadFlushReplay
| CronQueuePayloadFlushGroups
| CronQueuePayloadPing
| CronQueuePayloadProject
| CronQueuePayloadInsightsDaily