fix: how we fetch profiles in the buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-06 12:41:41 +00:00
parent 4736f8509d
commit 8afcf55154

View File

@@ -1,11 +1,11 @@
import { deepMergeObjects } from '@openpanel/common'; import { deepMergeObjects } from '@openpanel/common';
import { getSafeJson } from '@openpanel/json'; import { getSafeJson } from '@openpanel/json';
import type { ILogger } from '@openpanel/logger'; import type { ILogger } from '@openpanel/logger';
import { type Redis, getRedisCache } from '@openpanel/redis'; import { getRedisCache, type Redis } from '@openpanel/redis';
import shallowEqual from 'fast-deep-equal'; import shallowEqual from 'fast-deep-equal';
import { omit } from 'ramda'; import { omit } from 'ramda';
import sqlstring from 'sqlstring'; import sqlstring from 'sqlstring';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client'; import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client';
import type { IClickhouseProfile } from '../services/profile.service'; import type { IClickhouseProfile } from '../services/profile.service';
import { BaseBuffer } from './base-buffer'; import { BaseBuffer } from './base-buffer';
@@ -89,7 +89,7 @@ export class ProfileBuffer extends BaseBuffer {
'os_version', 'os_version',
'browser_version', 'browser_version',
], ],
profile.properties, profile.properties
); );
} }
@@ -97,16 +97,16 @@ export class ProfileBuffer extends BaseBuffer {
? deepMergeObjects(existingProfile, omit(['created_at'], profile)) ? deepMergeObjects(existingProfile, omit(['created_at'], profile))
: profile; : profile;
if (profile && existingProfile) { if (
if ( profile &&
shallowEqual( existingProfile &&
omit(['created_at'], existingProfile), shallowEqual(
omit(['created_at'], mergedProfile), omit(['created_at'], existingProfile),
) omit(['created_at'], mergedProfile)
) { )
this.logger.debug('Profile not changed, skipping'); ) {
return; this.logger.debug('Profile not changed, skipping');
} return;
} }
this.logger.debug('Merged profile will be inserted', { this.logger.debug('Merged profile will be inserted', {
@@ -151,11 +151,11 @@ export class ProfileBuffer extends BaseBuffer {
private async fetchProfile( private async fetchProfile(
profile: IClickhouseProfile, profile: IClickhouseProfile,
logger: ILogger, logger: ILogger
): Promise<IClickhouseProfile | null> { ): Promise<IClickhouseProfile | null> {
const existingProfile = await this.fetchFromCache( const existingProfile = await this.fetchFromCache(
profile.id, profile.id,
profile.project_id, profile.project_id
); );
if (existingProfile) { if (existingProfile) {
logger.debug('Profile found in Redis'); logger.debug('Profile found in Redis');
@@ -167,7 +167,7 @@ export class ProfileBuffer extends BaseBuffer {
public async fetchFromCache( public async fetchFromCache(
profileId: string, profileId: string,
projectId: string, projectId: string
): Promise<IClickhouseProfile | null> { ): Promise<IClickhouseProfile | null> {
const cacheKey = this.getProfileCacheKey({ const cacheKey = this.getProfileCacheKey({
profileId, profileId,
@@ -182,7 +182,7 @@ export class ProfileBuffer extends BaseBuffer {
private async fetchFromClickhouse( private async fetchFromClickhouse(
profile: IClickhouseProfile, profile: IClickhouseProfile,
logger: ILogger, logger: ILogger
): Promise<IClickhouseProfile | null> { ): Promise<IClickhouseProfile | null> {
logger.debug('Fetching profile from Clickhouse'); logger.debug('Fetching profile from Clickhouse');
const result = await chQuery<IClickhouseProfile>( const result = await chQuery<IClickhouseProfile>(
@@ -207,7 +207,7 @@ export class ProfileBuffer extends BaseBuffer {
} }
GROUP BY id, project_id GROUP BY id, project_id
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT 1`, LIMIT 1`
); );
logger.debug('Clickhouse fetch result', { logger.debug('Clickhouse fetch result', {
found: !!result[0], found: !!result[0],
@@ -221,7 +221,7 @@ export class ProfileBuffer extends BaseBuffer {
const profiles = await this.redis.lrange( const profiles = await this.redis.lrange(
this.redisKey, this.redisKey,
0, 0,
this.batchSize - 1, this.batchSize - 1
); );
if (profiles.length === 0) { if (profiles.length === 0) {
@@ -231,7 +231,7 @@ export class ProfileBuffer extends BaseBuffer {
this.logger.debug(`Processing ${profiles.length} profiles in buffer`); this.logger.debug(`Processing ${profiles.length} profiles in buffer`);
const parsedProfiles = profiles.map((p) => const parsedProfiles = profiles.map((p) =>
getSafeJson<IClickhouseProfile>(p), getSafeJson<IClickhouseProfile>(p)
); );
for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) {