From 5abf7b988d1ed1bec27f2d1c91c02a2cdb0b6544 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 26 Feb 2025 12:59:41 +0100 Subject: [PATCH] fix(buffer): check if key exists --- packages/db/src/buffers/profile-buffer-redis.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/db/src/buffers/profile-buffer-redis.ts b/packages/db/src/buffers/profile-buffer-redis.ts index 113e2df9..3d44c1b4 100644 --- a/packages/db/src/buffers/profile-buffer-redis.ts +++ b/packages/db/src/buffers/profile-buffer-redis.ts @@ -44,6 +44,14 @@ export class ProfileBuffer extends BaseBuffer { return `${this.redisProfilePrefix}${projectId}:${profileId}`; } + async alreadyExists(profile: IClickhouseProfile) { + const cacheKey = this.getProfileCacheKey({ + profileId: profile.id, + projectId: profile.project_id, + }); + return (await getRedisCache().exists(cacheKey)) === 1; + } + async add(profile: IClickhouseProfile, isFromEvent = false) { const logger = this.logger.child({ projectId: profile.project_id, @@ -53,13 +61,13 @@ export class ProfileBuffer extends BaseBuffer { try { logger.debug('Adding profile'); - const existingProfile = await this.fetchFromCache(profile, logger); - - if (isFromEvent && existingProfile) { + if (isFromEvent && (await this.alreadyExists(profile))) { logger.debug('Profile already created, skipping'); return; } + const existingProfile = await this.fetchProfile(profile, logger); + const mergedProfile: IClickhouseProfile = existingProfile ? deepMergeObjects(existingProfile, profile) : profile; @@ -126,7 +134,7 @@ export class ProfileBuffer extends BaseBuffer { } } - private async fetchFromCache( + private async fetchProfile( profile: IClickhouseProfile, logger: ILogger, ): Promise {