This commit is contained in:
Carl-Gerhard Lindesvärd
2024-11-28 21:11:22 +01:00
parent ee598e3bdd
commit e21cd7ed73

View File

@@ -4,12 +4,7 @@ import { toDots } from '@openpanel/common';
import { getRedisCache } from '@openpanel/redis'; import { getRedisCache } from '@openpanel/redis';
import { escape } from 'sqlstring'; import { escape } from 'sqlstring';
import { import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
TABLE_NAMES,
ch,
chQuery,
formatClickhouseDate,
} from '../clickhouse-client';
import { transformProfile } from '../services/profile.service'; import { transformProfile } from '../services/profile.service';
import type { import type {
IClickhouseProfile, IClickhouseProfile,
@@ -28,15 +23,6 @@ export class ProfileBuffer extends RedisBuffer<BufferType> {
super(TABLE_NAMES.profiles, BATCH_SIZE); super(TABLE_NAMES.profiles, BATCH_SIZE);
} }
protected transformProfiles(profiles: IClickhouseProfile[]): BufferType[] {
return profiles.map((profile) => ({
...profile,
created_at: profile.created_at
? formatClickhouseDate(profile.created_at)
: '',
}));
}
// this will do a couple of things: // this will do a couple of things:
// - we slice the queue to maxBufferSize since this queries have a limit on character count // - we slice the queue to maxBufferSize since this queries have a limit on character count
// - check redis cache for profiles // - check redis cache for profiles
@@ -54,14 +40,12 @@ export class ProfileBuffer extends RedisBuffer<BufferType> {
slicedQueue.filter((_, index) => !redisProfiles[index]), slicedQueue.filter((_, index) => !redisProfiles[index]),
); );
const profiles = this.createProfileValues( const toInsert = this.createProfileValues(
slicedQueue, slicedQueue,
redisProfiles, redisProfiles,
dbProfiles, dbProfiles,
); );
const toInsert = this.transformProfiles(profiles);
if (toInsert.length > 0) { if (toInsert.length > 0) {
await this.updateRedisCache(toInsert); await this.updateRedisCache(toInsert);
} }