fix(buffer): better merging profiles in buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-22 07:46:06 +01:00
parent 4c938131ca
commit 2022a82f03
3 changed files with 89 additions and 81 deletions

View File

@@ -1,12 +1,20 @@
import { createHash } from 'node:crypto';
import { getSafeJson } from '@openpanel/common';
import { deepMergeObjects } from '@openpanel/common';
// import { getSafeJson } from '@openpanel/json';
import { type Redis, getRedisCache } from '@openpanel/redis';
import { dissocPath, mergeDeepRight, omit, whereEq } from 'ramda';
import shallowEqual from 'fast-deep-equal';
import { omit } from 'ramda';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client';
import type { IClickhouseProfile } from '../services/profile.service';
import { BaseBuffer } from './base-buffer';
import { isPartialMatch } from './partial-json-match';
// TODO: Use @openpanel/json when polar is merged
function getSafeJson<T>(str: string): T | null {
try {
return JSON.parse(str);
} catch (e) {
return null;
}
}
export class ProfileBuffer extends BaseBuffer {
private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE
@@ -34,35 +42,14 @@ export class ProfileBuffer extends BaseBuffer {
this.redis = getRedisCache();
}
private excludeKeys(
profile: IClickhouseProfile,
exclude: string[][],
): IClickhouseProfile {
let filtered = profile;
for (const path of exclude) {
filtered = dissocPath(path, filtered);
}
return filtered;
}
private match(source: any, partial: any): boolean {
const exclude = [
['created_at'],
['properties', 'browser_version'],
['properties', 'browserVersion'],
['properties', 'latitude'],
['properties', 'longitude'],
['properties', 'os_version'],
['properties', 'osVersion'],
['properties', 'path'],
['properties', 'referrer_name'],
['properties', 'referrerName'],
['properties', 'referrer_type'],
['properties', 'referrerType'],
['properties', 'referrer'],
];
return isPartialMatch(source, this.excludeKeys(partial, exclude));
private getProfileCacheKey({
projectId,
profileId,
}: {
profileId: string;
projectId: string;
}) {
return `${this.redisProfilePrefix}${projectId}:${profileId}`;
}
async add(profile: IClickhouseProfile) {
@@ -70,56 +57,54 @@ export class ProfileBuffer extends BaseBuffer {
this.logger.debug('Adding profile', {
projectId: profile.project_id,
profileId: profile.id,
profile,
});
const cacheKey = `${this.redisProfilePrefix}${profile.project_id}:${profile.id}`;
// Check if we have this profile in Redis cache
const existingProfile = await this.redis.get(cacheKey);
let mergedProfile = profile;
const existingProfile = await this.fetchFromCache(profile);
if (!existingProfile) {
this.logger.debug('Profile not found in cache, checking Clickhouse', {
projectId: profile.project_id,
profileId: profile.id,
});
// If not in cache, check Clickhouse
const clickhouseProfile = await this.fetchFromClickhouse(profile);
if (clickhouseProfile) {
this.logger.debug('Found existing profile in Clickhouse, merging', {
projectId: profile.project_id,
profileId: profile.id,
});
mergedProfile = mergeDeepRight(clickhouseProfile, profile);
}
} else {
const parsedProfile = getSafeJson<IClickhouseProfile>(existingProfile);
const mergedProfile: IClickhouseProfile = existingProfile
? deepMergeObjects(existingProfile, profile)
: profile;
if (parsedProfile) {
// Only merge if checksums are different
if (this.match(parsedProfile, profile)) {
return; // Skip if checksums match
}
this.logger.debug('Profile changed, merging with cached version', {
existingProfile: parsedProfile,
incomingProfile: profile,
});
mergedProfile = mergeDeepRight(parsedProfile, profile);
// Avoid unnecessary updates:
// If the profile is less than X minutes old
// and the profiles are the same
if (profile.created_at && existingProfile?.created_at) {
const a = new Date(profile.created_at);
const b = new Date(existingProfile.created_at);
const diffTime = Math.abs(a.getTime() - b.getTime());
if (
diffTime < 1000 * 60 * 10 &&
shallowEqual(
omit(['created_at'], existingProfile),
omit(['created_at'], mergedProfile),
)
) {
this.logger.debug('Profile not changed, skipping');
return;
}
}
this.logger.debug('Merged profile will be inserted', {
mergedProfile,
existingProfile,
profile,
});
const cacheTtl = profile.is_external
? 60 * 60 * 24 * this.daysToKeep
: 60 * 60; // 1 hour for internal profiles
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
const result = await this.redis
.multi()
.set(
cacheKey,
JSON.stringify(mergedProfile),
'EX',
60 * 60 * 24 * this.daysToKeep,
)
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', cacheTtl)
.rpush(this.redisBufferKey, JSON.stringify(mergedProfile))
.llen(this.redisBufferKey)
.exec();
if (!result) {
this.logger.error('Failed to add profile to Redis', {
profile,
@@ -142,6 +127,33 @@ export class ProfileBuffer extends BaseBuffer {
}
}
private async fetchFromCache(
profile: IClickhouseProfile,
): Promise<IClickhouseProfile | null> {
this.logger.debug('Fetching profile from Redis', {
projectId: profile.project_id,
profileId: profile.id,
});
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
const existingProfile = await getRedisCache().get(cacheKey);
if (existingProfile) {
const parsedProfile = getSafeJson<IClickhouseProfile>(existingProfile);
if (parsedProfile) {
this.logger.debug('Profile found in Redis', {
projectId: profile.project_id,
profileId: profile.id,
});
return parsedProfile;
}
}
return this.fetchFromClickhouse(profile);
}
private async fetchFromClickhouse(
profile: IClickhouseProfile,
): Promise<IClickhouseProfile | null> {
@@ -190,20 +202,12 @@ export class ProfileBuffer extends BaseBuffer {
getSafeJson<IClickhouseProfile>(p),
);
let processedChunks = 0;
for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) {
processedChunks++;
this.logger.debug(`Processing chunk ${processedChunks}`, {
size: chunk.length,
});
this.logger.debug('Chunk data', { chunk });
await ch.insert({
table: TABLE_NAMES.profiles,
values: chunk,
format: 'JSONEachRow',
});
this.logger.debug(`Successfully inserted chunk ${processedChunks}`);
}
// Only remove profiles after successful insert
@@ -211,7 +215,6 @@ export class ProfileBuffer extends BaseBuffer {
this.logger.info('Successfully completed profile processing', {
totalProfiles: profiles.length,
totalChunks: processedChunks,
});
} catch (error) {
this.logger.error('Failed to process buffer', { error });