diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 60c06ae3..12e38a69 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -3,8 +3,8 @@ import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; import { getProfileById, getSalts, + groupBuffer, replayBuffer, - upsertGroup, upsertProfile, } from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; @@ -344,7 +344,7 @@ async function handleGroup( const profileId = payload.profileId ?? context.deviceId; await Promise.all([ - upsertGroup({ + groupBuffer.add({ id, projectId: context.projectId, type, diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index e3835e91..f49a1435 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -68,6 +68,11 @@ export async function bootCron() { type: 'flushReplay', pattern: 1000 * 10, }, + { + name: 'flush', + type: 'flushGroups', + pattern: 1000 * 10, + }, { name: 'insightsDaily', type: 'insightsDaily', diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index f9428614..c6614d15 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -1,6 +1,6 @@ import type { Job } from 'bullmq'; -import { eventBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db'; +import { eventBuffer, groupBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db'; import type { CronQueuePayload } from '@openpanel/queue'; import { jobdeleteProjects } from './cron.delete-projects'; @@ -30,6 +30,9 @@ export async function cronJob(job: Job) { case 'flushReplay': { return await replayBuffer.tryFlush(); } + case 'flushGroups': { + return await groupBuffer.tryFlush(); + } case 'ping': { return await ping(); } diff --git a/packages/db/src/buffers/group-buffer.ts b/packages/db/src/buffers/group-buffer.ts new file mode 100644 index 00000000..4108b21c --- /dev/null +++ b/packages/db/src/buffers/group-buffer.ts @@ -0,0 +1,195 @@ +import { toDots } from '@openpanel/common'; +import { getSafeJson } from '@openpanel/json'; +import { getRedisCache, type Redis } from '@openpanel/redis'; +import shallowEqual from 'fast-deep-equal'; +import sqlstring from 'sqlstring'; +import { ch, chQuery, formatClickhouseDate, TABLE_NAMES } from '../clickhouse/client'; +import { BaseBuffer } from './base-buffer'; + +type IGroupBufferEntry = { + project_id: string; + id: string; + type: string; + name: string; + properties: Record; + created_at: string; + version: string; + deleted: number; +}; + +type IGroupCacheEntry = { + id: string; + project_id: string; + type: string; + name: string; + properties: Record; + created_at: string; +}; + +export type IGroupBufferInput = { + id: string; + projectId: string; + type: string; + name: string; + properties?: Record; +}; + +export class GroupBuffer extends BaseBuffer { + private batchSize = process.env.GROUP_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.GROUP_BUFFER_BATCH_SIZE, 10) + : 200; + private chunkSize = process.env.GROUP_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.GROUP_BUFFER_CHUNK_SIZE, 10) + : 1000; + private ttlInSeconds = process.env.GROUP_BUFFER_TTL_IN_SECONDS + ? Number.parseInt(process.env.GROUP_BUFFER_TTL_IN_SECONDS, 10) + : 60 * 60; + + private readonly redisKey = 'group-buffer'; + private readonly redisCachePrefix = 'group-cache:'; + + private redis: Redis; + + constructor() { + super({ + name: 'group', + onFlush: async () => { + await this.processBuffer(); + }, + }); + this.redis = getRedisCache(); + } + + private getCacheKey(projectId: string, id: string) { + return `${this.redisCachePrefix}${projectId}:${id}`; + } + + private async fetchFromCache( + projectId: string, + id: string + ): Promise { + const raw = await this.redis.get(this.getCacheKey(projectId, id)); + if (!raw) return null; + return getSafeJson(raw); + } + + private async fetchFromClickhouse( + projectId: string, + id: string + ): Promise { + const rows = await chQuery(` + SELECT project_id, id, type, name, properties, created_at + FROM ${TABLE_NAMES.groups} FINAL + WHERE project_id = ${sqlstring.escape(projectId)} + AND id = ${sqlstring.escape(id)} + AND deleted = 0 + `); + return rows[0] ?? null; + } + + async add(input: IGroupBufferInput): Promise { + try { + const cacheKey = this.getCacheKey(input.projectId, input.id); + + const existing = + (await this.fetchFromCache(input.projectId, input.id)) ?? + (await this.fetchFromClickhouse(input.projectId, input.id)); + + const mergedProperties = toDots({ + ...(existing?.properties ?? {}), + ...(input.properties ?? {}), + }) as Record; + + const entry: IGroupBufferEntry = { + project_id: input.projectId, + id: input.id, + type: input.type, + name: input.name, + properties: mergedProperties, + created_at: formatClickhouseDate( + existing?.created_at ? new Date(existing.created_at) : new Date() + ), + version: String(Date.now()), + deleted: 0, + }; + + if ( + existing && + existing.type === entry.type && + existing.name === entry.name && + shallowEqual(existing.properties, entry.properties) + ) { + this.logger.debug('Group not changed, skipping', { id: input.id }); + return; + } + + const cacheEntry: IGroupCacheEntry = { + id: entry.id, + project_id: entry.project_id, + type: entry.type, + name: entry.name, + properties: entry.properties, + created_at: entry.created_at, + }; + + const result = await this.redis + .multi() + .set(cacheKey, JSON.stringify(cacheEntry), 'EX', this.ttlInSeconds) + .rpush(this.redisKey, JSON.stringify(entry)) + .incr(this.bufferCounterKey) + .llen(this.redisKey) + .exec(); + + if (!result) { + this.logger.error('Failed to add group to Redis', { input }); + return; + } + + const bufferLength = (result?.[3]?.[1] as number) ?? 0; + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } + } catch (error) { + this.logger.error('Failed to add group', { error, input }); + } + } + + async processBuffer(): Promise { + try { + this.logger.debug('Starting group buffer processing'); + const items = await this.redis.lrange(this.redisKey, 0, this.batchSize - 1); + + if (items.length === 0) { + this.logger.debug('No groups to process'); + return; + } + + this.logger.debug(`Processing ${items.length} groups in buffer`); + const parsed = items.map((i) => getSafeJson(i)); + + for (const chunk of this.chunks(parsed, this.chunkSize)) { + await ch.insert({ + table: TABLE_NAMES.groups, + values: chunk, + format: 'JSONEachRow', + }); + } + + await this.redis + .multi() + .ltrim(this.redisKey, items.length, -1) + .decrby(this.bufferCounterKey, items.length) + .exec(); + + this.logger.debug('Successfully completed group processing', { + totalGroups: items.length, + }); + } catch (error) { + this.logger.error('Failed to process buffer', { error }); + } + } + + async getBufferSize(): Promise { + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); + } +} diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 1d7b1baf..86741b54 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,5 +1,6 @@ import { BotBuffer as BotBufferRedis } from './bot-buffer'; import { EventBuffer as EventBufferRedis } from './event-buffer'; +import { GroupBuffer } from './group-buffer'; import { ProfileBackfillBuffer } from './profile-backfill-buffer'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; import { ReplayBuffer } from './replay-buffer'; @@ -11,6 +12,7 @@ export const botBuffer = new BotBufferRedis(); export const sessionBuffer = new SessionBuffer(); export const profileBackfillBuffer = new ProfileBackfillBuffer(); export const replayBuffer = new ReplayBuffer(); +export const groupBuffer = new GroupBuffer(); export type { ProfileBackfillEntry } from './profile-backfill-buffer'; export type { IClickhouseSessionReplayChunk } from './replay-buffer'; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 48a5893e..171e4707 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -134,6 +134,10 @@ export type CronQueuePayloadGscSync = { type: 'gscSync'; payload: undefined; }; +export type CronQueuePayloadFlushGroups = { + type: 'flushGroups'; + payload: undefined; +}; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents @@ -141,6 +145,7 @@ export type CronQueuePayload = | CronQueuePayloadFlushProfiles | CronQueuePayloadFlushProfileBackfill | CronQueuePayloadFlushReplay + | CronQueuePayloadFlushGroups | CronQueuePayloadPing | CronQueuePayloadProject | CronQueuePayloadInsightsDaily