diff --git a/packages/db/package.json b/packages/db/package.json index 6fafef47..dc5334a3 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -21,6 +21,7 @@ "@openpanel/validation": "workspace:*", "@prisma/client": "^5.1.1", "@prisma/extension-read-replicas": "^0.4.0", + "fast-deep-equal": "^3.1.3", "jiti": "^2.4.1", "prisma-json-types-generator": "^3.1.1", "ramda": "^0.29.1", diff --git a/packages/db/src/buffers/profile-buffer-redis.ts b/packages/db/src/buffers/profile-buffer-redis.ts index 97ca0c58..4fe64a8c 100644 --- a/packages/db/src/buffers/profile-buffer-redis.ts +++ b/packages/db/src/buffers/profile-buffer-redis.ts @@ -1,12 +1,20 @@ -import { createHash } from 'node:crypto'; -import { getSafeJson } from '@openpanel/common'; +import { deepMergeObjects } from '@openpanel/common'; +// import { getSafeJson } from '@openpanel/json'; import { type Redis, getRedisCache } from '@openpanel/redis'; -import { dissocPath, mergeDeepRight, omit, whereEq } from 'ramda'; - +import shallowEqual from 'fast-deep-equal'; +import { omit } from 'ramda'; import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client'; import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; -import { isPartialMatch } from './partial-json-match'; + +// TODO: Use @openpanel/json when polar is merged +function getSafeJson(str: string): T | null { + try { + return JSON.parse(str); + } catch (e) { + return null; + } +} export class ProfileBuffer extends BaseBuffer { private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE @@ -34,35 +42,14 @@ export class ProfileBuffer extends BaseBuffer { this.redis = getRedisCache(); } - private excludeKeys( - profile: IClickhouseProfile, - exclude: string[][], - ): IClickhouseProfile { - let filtered = profile; - for (const path of exclude) { - filtered = dissocPath(path, filtered); - } - return filtered; - } - - private match(source: any, partial: any): boolean { - const exclude = [ - ['created_at'], - ['properties', 'browser_version'], - ['properties', 'browserVersion'], - ['properties', 'latitude'], - ['properties', 'longitude'], - ['properties', 'os_version'], - ['properties', 'osVersion'], - ['properties', 'path'], - ['properties', 'referrer_name'], - ['properties', 'referrerName'], - ['properties', 'referrer_type'], - ['properties', 'referrerType'], - ['properties', 'referrer'], - ]; - - return isPartialMatch(source, this.excludeKeys(partial, exclude)); + private getProfileCacheKey({ + projectId, + profileId, + }: { + profileId: string; + projectId: string; + }) { + return `${this.redisProfilePrefix}${projectId}:${profileId}`; } async add(profile: IClickhouseProfile) { @@ -70,56 +57,54 @@ export class ProfileBuffer extends BaseBuffer { this.logger.debug('Adding profile', { projectId: profile.project_id, profileId: profile.id, - profile, }); - const cacheKey = `${this.redisProfilePrefix}${profile.project_id}:${profile.id}`; - // Check if we have this profile in Redis cache - const existingProfile = await this.redis.get(cacheKey); - let mergedProfile = profile; + const existingProfile = await this.fetchFromCache(profile); - if (!existingProfile) { - this.logger.debug('Profile not found in cache, checking Clickhouse', { - projectId: profile.project_id, - profileId: profile.id, - }); - // If not in cache, check Clickhouse - const clickhouseProfile = await this.fetchFromClickhouse(profile); - if (clickhouseProfile) { - this.logger.debug('Found existing profile in Clickhouse, merging', { - projectId: profile.project_id, - profileId: profile.id, - }); - mergedProfile = mergeDeepRight(clickhouseProfile, profile); - } - } else { - const parsedProfile = getSafeJson(existingProfile); + const mergedProfile: IClickhouseProfile = existingProfile + ? deepMergeObjects(existingProfile, profile) + : profile; - if (parsedProfile) { - // Only merge if checksums are different - if (this.match(parsedProfile, profile)) { - return; // Skip if checksums match - } - - this.logger.debug('Profile changed, merging with cached version', { - existingProfile: parsedProfile, - incomingProfile: profile, - }); - mergedProfile = mergeDeepRight(parsedProfile, profile); + // Avoid unnecessary updates: + // If the profile is less than X minutes old + // and the profiles are the same + if (profile.created_at && existingProfile?.created_at) { + const a = new Date(profile.created_at); + const b = new Date(existingProfile.created_at); + const diffTime = Math.abs(a.getTime() - b.getTime()); + if ( + diffTime < 1000 * 60 * 10 && + shallowEqual( + omit(['created_at'], existingProfile), + omit(['created_at'], mergedProfile), + ) + ) { + this.logger.debug('Profile not changed, skipping'); + return; } } + this.logger.debug('Merged profile will be inserted', { + mergedProfile, + existingProfile, + profile, + }); + + const cacheTtl = profile.is_external + ? 60 * 60 * 24 * this.daysToKeep + : 60 * 60; // 1 hour for internal profiles + const cacheKey = this.getProfileCacheKey({ + profileId: profile.id, + projectId: profile.project_id, + }); + const result = await this.redis .multi() - .set( - cacheKey, - JSON.stringify(mergedProfile), - 'EX', - 60 * 60 * 24 * this.daysToKeep, - ) + .set(cacheKey, JSON.stringify(mergedProfile), 'EX', cacheTtl) .rpush(this.redisBufferKey, JSON.stringify(mergedProfile)) .llen(this.redisBufferKey) .exec(); + if (!result) { this.logger.error('Failed to add profile to Redis', { profile, @@ -142,6 +127,33 @@ export class ProfileBuffer extends BaseBuffer { } } + private async fetchFromCache( + profile: IClickhouseProfile, + ): Promise { + this.logger.debug('Fetching profile from Redis', { + projectId: profile.project_id, + profileId: profile.id, + }); + const cacheKey = this.getProfileCacheKey({ + profileId: profile.id, + projectId: profile.project_id, + }); + + const existingProfile = await getRedisCache().get(cacheKey); + if (existingProfile) { + const parsedProfile = getSafeJson(existingProfile); + if (parsedProfile) { + this.logger.debug('Profile found in Redis', { + projectId: profile.project_id, + profileId: profile.id, + }); + return parsedProfile; + } + } + + return this.fetchFromClickhouse(profile); + } + private async fetchFromClickhouse( profile: IClickhouseProfile, ): Promise { @@ -190,20 +202,12 @@ export class ProfileBuffer extends BaseBuffer { getSafeJson(p), ); - let processedChunks = 0; for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { - processedChunks++; - this.logger.debug(`Processing chunk ${processedChunks}`, { - size: chunk.length, - }); - this.logger.debug('Chunk data', { chunk }); - await ch.insert({ table: TABLE_NAMES.profiles, values: chunk, format: 'JSONEachRow', }); - this.logger.debug(`Successfully inserted chunk ${processedChunks}`); } // Only remove profiles after successful insert @@ -211,7 +215,6 @@ export class ProfileBuffer extends BaseBuffer { this.logger.info('Successfully completed profile processing', { totalProfiles: profiles.length, - totalChunks: processedChunks, }); } catch (error) { this.logger.error('Failed to process buffer', { error }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 72117671..19fba90a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -898,6 +898,9 @@ importers: '@prisma/extension-read-replicas': specifier: ^0.4.0 version: 0.4.0(@prisma/client@5.9.1(prisma@5.9.1)) + fast-deep-equal: + specifier: ^3.1.3 + version: 3.1.3 jiti: specifier: ^2.4.1 version: 2.4.1 @@ -8820,6 +8823,7 @@ packages: lodash.isequal@4.5.0: resolution: {integrity: sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==} + deprecated: This package is deprecated. Use require('node:util').isDeepStrictEqual instead. lodash.isinteger@4.0.4: resolution: {integrity: sha512-DBwtEWN2caHQ9/imiNeEA5ys1JoRtRfY3d7V9wkqtbycnAmTvRRmbHKDV4a0EYc678/dia0jrte4tjYwVBaZUA==}