chore(dashboard,db): prepping for migration time
This commit is contained in:
@@ -4,7 +4,12 @@ import { toDots } from '@openpanel/common';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
|
||||
import { escape } from 'sqlstring';
|
||||
import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
|
||||
import {
|
||||
TABLE_NAMES,
|
||||
ch,
|
||||
chQuery,
|
||||
formatClickhouseDate,
|
||||
} from '../clickhouse-client';
|
||||
import { transformProfile } from '../services/profile.service';
|
||||
import type {
|
||||
IClickhouseProfile,
|
||||
@@ -23,6 +28,15 @@ export class ProfileBuffer extends RedisBuffer<BufferType> {
|
||||
super(TABLE_NAMES.profiles, BATCH_SIZE);
|
||||
}
|
||||
|
||||
protected transformProfiles(profiles: IClickhouseProfile[]): BufferType[] {
|
||||
return profiles.map((profile) => ({
|
||||
...profile,
|
||||
created_at: profile.created_at
|
||||
? formatClickhouseDate(profile.created_at)
|
||||
: '',
|
||||
}));
|
||||
}
|
||||
|
||||
// this will do a couple of things:
|
||||
// - we slice the queue to maxBufferSize since this queries have a limit on character count
|
||||
// - check redis cache for profiles
|
||||
@@ -40,12 +54,14 @@ export class ProfileBuffer extends RedisBuffer<BufferType> {
|
||||
slicedQueue.filter((_, index) => !redisProfiles[index]),
|
||||
);
|
||||
|
||||
const toInsert = this.createProfileValues(
|
||||
const profiles = this.createProfileValues(
|
||||
slicedQueue,
|
||||
redisProfiles,
|
||||
dbProfiles,
|
||||
);
|
||||
|
||||
const toInsert = this.transformProfiles(profiles);
|
||||
|
||||
if (toInsert.length > 0) {
|
||||
await this.updateRedisCache(toInsert);
|
||||
}
|
||||
|
||||
@@ -2,9 +2,12 @@ import type { ResponseJSON } from '@clickhouse/client';
|
||||
import { createClient } from '@clickhouse/client';
|
||||
import { escape } from 'sqlstring';
|
||||
|
||||
import type { NodeClickHouseClientConfigOptions } from '@clickhouse/client/dist/config';
|
||||
import { createLogger } from '@openpanel/logger';
|
||||
import type { IInterval } from '@openpanel/validation';
|
||||
|
||||
export { createClient };
|
||||
|
||||
const logger = createLogger({ name: 'clickhouse' });
|
||||
|
||||
export const TABLE_NAMES = {
|
||||
@@ -19,8 +22,7 @@ export const TABLE_NAMES = {
|
||||
cohort_events_mv: 'cohort_events_mv',
|
||||
};
|
||||
|
||||
export const originalCh = createClient({
|
||||
url: process.env.CLICKHOUSE_URL,
|
||||
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
|
||||
max_open_connections: 30,
|
||||
request_timeout: 30000,
|
||||
keep_alive: {
|
||||
@@ -33,14 +35,23 @@ export const originalCh = createClient({
|
||||
clickhouse_settings: {
|
||||
date_time_input_format: 'best_effort',
|
||||
},
|
||||
};
|
||||
|
||||
export const originalCh = createClient({
|
||||
// TODO: remove this after migration
|
||||
url: process.env.CLICKHOUSE_URL_CLUSTER ?? process.env.CLICKHOUSE_URL,
|
||||
...CLICKHOUSE_OPTIONS,
|
||||
});
|
||||
|
||||
const cleanQuery = (query: string) =>
|
||||
query.replace(/\n/g, '').replace(/\s+/g, ' ').trim();
|
||||
|
||||
export const ch = new Proxy(originalCh, {
|
||||
get(target, property, receiver) {
|
||||
if (property === 'insert' || property === 'query') {
|
||||
return async (...args: any[]) => {
|
||||
const childLogger = logger.child({
|
||||
query: args[0].query,
|
||||
query: cleanQuery(args[0].query),
|
||||
property,
|
||||
});
|
||||
try {
|
||||
@@ -113,7 +124,7 @@ export async function chQueryWithMeta<T extends Record<string, any>>(
|
||||
};
|
||||
|
||||
logger.info('query info', {
|
||||
query,
|
||||
query: cleanQuery(query),
|
||||
rows: json.rows,
|
||||
stats: response.statistics,
|
||||
elapsed: Date.now() - start,
|
||||
|
||||
Reference in New Issue
Block a user