diff --git a/packages/db/src/buffers/profile-buffer-redis.ts b/packages/db/src/buffers/profile-buffer-redis.ts index 113e2df9..3d44c1b4 100644 --- a/packages/db/src/buffers/profile-buffer-redis.ts +++ b/packages/db/src/buffers/profile-buffer-redis.ts @@ -44,6 +44,14 @@ export class ProfileBuffer extends BaseBuffer { return `${this.redisProfilePrefix}${projectId}:${profileId}`; } + async alreadyExists(profile: IClickhouseProfile) { + const cacheKey = this.getProfileCacheKey({ + profileId: profile.id, + projectId: profile.project_id, + }); + return (await getRedisCache().exists(cacheKey)) === 1; + } + async add(profile: IClickhouseProfile, isFromEvent = false) { const logger = this.logger.child({ projectId: profile.project_id, @@ -53,13 +61,13 @@ export class ProfileBuffer extends BaseBuffer { try { logger.debug('Adding profile'); - const existingProfile = await this.fetchFromCache(profile, logger); - - if (isFromEvent && existingProfile) { + if (isFromEvent && (await this.alreadyExists(profile))) { logger.debug('Profile already created, skipping'); return; } + const existingProfile = await this.fetchProfile(profile, logger); + const mergedProfile: IClickhouseProfile = existingProfile ? deepMergeObjects(existingProfile, profile) : profile; @@ -126,7 +134,7 @@ export class ProfileBuffer extends BaseBuffer { } } - private async fetchFromCache( + private async fetchProfile( profile: IClickhouseProfile, logger: ILogger, ): Promise {