diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index de80bd42..d568bb23 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,11 +1,11 @@ import { deepMergeObjects } from '@openpanel/common'; import { getSafeJson } from '@openpanel/json'; import type { ILogger } from '@openpanel/logger'; -import { type Redis, getRedisCache } from '@openpanel/redis'; +import { getRedisCache, type Redis } from '@openpanel/redis'; import shallowEqual from 'fast-deep-equal'; import { omit } from 'ramda'; import sqlstring from 'sqlstring'; -import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client'; +import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; @@ -89,7 +89,7 @@ export class ProfileBuffer extends BaseBuffer { 'os_version', 'browser_version', ], - profile.properties, + profile.properties ); } @@ -97,16 +97,16 @@ export class ProfileBuffer extends BaseBuffer { ? deepMergeObjects(existingProfile, omit(['created_at'], profile)) : profile; - if (profile && existingProfile) { - if ( - shallowEqual( - omit(['created_at'], existingProfile), - omit(['created_at'], mergedProfile), - ) - ) { - this.logger.debug('Profile not changed, skipping'); - return; - } + if ( + profile && + existingProfile && + shallowEqual( + omit(['created_at'], existingProfile), + omit(['created_at'], mergedProfile) + ) + ) { + this.logger.debug('Profile not changed, skipping'); + return; } this.logger.debug('Merged profile will be inserted', { @@ -151,11 +151,11 @@ export class ProfileBuffer extends BaseBuffer { private async fetchProfile( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { const existingProfile = await this.fetchFromCache( profile.id, - profile.project_id, + profile.project_id ); if (existingProfile) { logger.debug('Profile found in Redis'); @@ -167,7 +167,7 @@ export class ProfileBuffer extends BaseBuffer { public async fetchFromCache( profileId: string, - projectId: string, + projectId: string ): Promise { const cacheKey = this.getProfileCacheKey({ profileId, @@ -182,7 +182,7 @@ export class ProfileBuffer extends BaseBuffer { private async fetchFromClickhouse( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { logger.debug('Fetching profile from Clickhouse'); const result = await chQuery( @@ -207,7 +207,7 @@ export class ProfileBuffer extends BaseBuffer { } GROUP BY id, project_id ORDER BY created_at DESC - LIMIT 1`, + LIMIT 1` ); logger.debug('Clickhouse fetch result', { found: !!result[0], @@ -221,7 +221,7 @@ export class ProfileBuffer extends BaseBuffer { const profiles = await this.redis.lrange( this.redisKey, 0, - this.batchSize - 1, + this.batchSize - 1 ); if (profiles.length === 0) { @@ -231,7 +231,7 @@ export class ProfileBuffer extends BaseBuffer { this.logger.debug(`Processing ${profiles.length} profiles in buffer`); const parsedProfiles = profiles.map((p) => - getSafeJson(p), + getSafeJson(p) ); for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) {