chore(root): migrate to biome

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-09-16 12:20:40 +02:00
parent 1f6e198336
commit 32e91959f6
383 changed files with 1943 additions and 3085 deletions

View File

@@ -1,4 +1,4 @@
import { ch, TABLE_NAMES } from '../clickhouse-client';
import { TABLE_NAMES, ch } from '../clickhouse-client';
import type { IClickhouseBotEvent } from '../services/event.service';
import type {
Find,

View File

@@ -17,20 +17,20 @@ export type OnCompleted<T> =
export type ProcessQueue<T> = (data: QueueItem<T>[]) => Promise<number[]>;
export type Find<T, R = unknown> = (
callback: (item: QueueItem<T>) => boolean
callback: (item: QueueItem<T>) => boolean,
) => Promise<R | null>;
export type FindMany<T, R = unknown> = (
callback: (item: QueueItem<T>) => boolean
callback: (item: QueueItem<T>) => boolean,
) => Promise<R[]>;
const getError = (e: unknown) => {
if (e instanceof Error) {
return [
'Name: ' + e.name,
'Message: ' + e.message,
'Stack: ' + e.stack,
'Cause: ' + (e.cause ? String(e.cause) : ''),
`Name: ${e.name}`,
`Message: ${e.message}`,
`Stack: ${e.stack}`,
`Cause: ${e.cause ? String(e.cause) : ''}`,
].join('\n');
}
return 'Unknown error';
@@ -59,13 +59,13 @@ export abstract class RedisBuffer<T> {
this.table = options.table;
this.batchSize = options.batchSize;
this.disableAutoFlush = options.disableAutoFlush;
this.logger = createLogger({ name: `buffer` }).child({
this.logger = createLogger({ name: 'buffer' }).child({
table: this.table,
});
}
public getKey(name?: string) {
const key = this.prefix + ':' + this.table;
const key = `${this.prefix}:${this.table}`;
if (name) {
return `${key}:${name}`;
}
@@ -78,12 +78,12 @@ export abstract class RedisBuffer<T> {
const length = await getRedisCache().llen(this.getKey());
this.logger.debug(
`Inserted item into buffer ${this.table}. Current length: ${length}`
`Inserted item into buffer ${this.table}. Current length: ${length}`,
);
if (!this.disableAutoFlush && this.batchSize && length >= this.batchSize) {
this.logger.info(
`Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`
`Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`,
);
this.flush();
}
@@ -99,7 +99,7 @@ export abstract class RedisBuffer<T> {
}
this.logger.info(
`Flushing ${queue.length} items from buffer ${this.table}`
`Flushing ${queue.length} items from buffer ${this.table}`,
);
try {
@@ -112,19 +112,19 @@ export abstract class RedisBuffer<T> {
if (this.onCompleted) {
const res = await this.onCompleted(data);
this.logger.info(
`Completed processing ${res.length} items from buffer ${this.table}`
`Completed processing ${res.length} items from buffer ${this.table}`,
);
return { count: res.length, data: res };
}
this.logger.info(
`Processed ${indexes.length} items from buffer ${this.table}`
`Processed ${indexes.length} items from buffer ${this.table}`,
);
return { count: indexes.length, data: indexes };
} catch (e) {
this.logger.error(
`Failed to process queue while flushing buffer ${this.table}:`,
e
e,
);
const timestamp = new Date().getTime();
await getRedisCache().hset(this.getKey(`failed:${timestamp}`), {
@@ -133,13 +133,13 @@ export abstract class RedisBuffer<T> {
retries: 0,
});
this.logger.warn(
`Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`
`Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`,
);
}
} catch (e) {
this.logger.error(
`Failed to get queue while flushing buffer ${this.table}:`,
e
e,
);
}
}
@@ -152,7 +152,7 @@ export abstract class RedisBuffer<T> {
multi.lrem(this.getKey(), 0, DELETE);
await multi.exec();
this.logger.debug(
`Deleted ${indexes.length} items from buffer ${this.table}`
`Deleted ${indexes.length} items from buffer ${this.table}`,
);
}
@@ -165,7 +165,7 @@ export abstract class RedisBuffer<T> {
}))
.filter((item): item is QueueItem<T> => item.event !== null);
this.logger.debug(
`Retrieved ${result.length} items from buffer ${this.table}`
`Retrieved ${result.length} items from buffer ${this.table}`,
);
return result;
}

View File

@@ -4,7 +4,7 @@ import SuperJSON from 'superjson';
import { deepMergeObjects } from '@openpanel/common';
import { getRedisCache, getRedisPub } from '@openpanel/redis';
import { ch, TABLE_NAMES } from '../clickhouse-client';
import { TABLE_NAMES, ch } from '../clickhouse-client';
import { transformEvent } from '../services/event.service';
import type {
IClickhouseEvent,
@@ -22,7 +22,7 @@ import { RedisBuffer } from './buffer';
const sortOldestFirst = (
a: QueueItem<IClickhouseEvent>,
b: QueueItem<IClickhouseEvent>
b: QueueItem<IClickhouseEvent>,
) =>
new Date(a.event.created_at).getTime() -
new Date(b.event.created_at).getTime();
@@ -37,25 +37,25 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
public onInsert?: OnInsert<IClickhouseEvent> | undefined = (event) => {
getRedisPub().publish(
'event:received',
SuperJSON.stringify(transformEvent(event))
SuperJSON.stringify(transformEvent(event)),
);
if (event.profile_id) {
getRedisCache().set(
`live:event:${event.project_id}:${event.profile_id}`,
'',
'EX',
60 * 5
60 * 5,
);
}
};
public onCompleted?: OnCompleted<IClickhouseEvent> | undefined = (
savedEvents
savedEvents,
) => {
for (const event of savedEvents) {
getRedisPub().publish(
'event:saved',
SuperJSON.stringify(transformEvent(event))
SuperJSON.stringify(transformEvent(event)),
);
}
@@ -76,7 +76,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
queue
.filter(
(item) =>
item.event.name !== 'screen_view' || item.event.device === 'server'
item.event.name !== 'screen_view' || item.event.device === 'server',
)
.forEach((item) => {
// Find the last event with data and merge it with the current event
@@ -93,7 +93,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
const event = deepMergeObjects<IClickhouseEvent>(
omit(['properties', 'duration'], lastEventWithData?.event || {}),
item.event
item.event,
);
if (lastEventWithData) {
@@ -111,8 +111,8 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
(item) => item.event.session_id,
queue.filter(
(item) =>
item.event.name === 'screen_view' && item.event.device !== 'server'
)
item.event.name === 'screen_view' && item.event.device !== 'server',
),
);
// Iterate over each group
@@ -125,7 +125,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
const hasSessionEnd = queue.find(
(item) =>
item.event.name === 'session_end' &&
item.event.session_id === sessionId
item.event.session_id === sessionId,
);
screenViews
@@ -187,7 +187,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
};
public findMany: FindMany<IClickhouseEvent, IServiceEvent> = async (
callback
callback,
) => {
return this.getQueue(-1)
.then((queue) => {

View File

@@ -3,7 +3,7 @@ import { mergeDeepRight } from 'ramda';
import { toDots } from '@openpanel/common';
import { getRedisCache } from '@openpanel/redis';
import { ch, chQuery, TABLE_NAMES } from '../clickhouse-client';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
import type {
IClickhouseProfile,
IServiceProfile,
@@ -35,13 +35,13 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
const cleanedQueue = this.combineQueueItems(queue);
const redisProfiles = await this.getCachedProfiles(cleanedQueue);
const dbProfiles = await this.fetchDbProfiles(
cleanedQueue.filter((_, index) => !redisProfiles[index])
cleanedQueue.filter((_, index) => !redisProfiles[index]),
);
const values = this.createProfileValues(
cleanedQueue,
redisProfiles,
dbProfiles
dbProfiles,
);
if (values.length > 0) {
@@ -55,7 +55,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
private matchPartialObject(
full: any,
partial: any,
options: { ignore: string[] }
options: { ignore: string[] },
): boolean {
if (typeof partial !== 'object' || partial === null) {
return partial === full;
@@ -78,7 +78,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
}
private combineQueueItems(
queue: QueueItem<IClickhouseProfile>[]
queue: QueueItem<IClickhouseProfile>[],
): QueueItem<IClickhouseProfile>[] {
const itemsToClickhouse = new Map<string, QueueItem<IClickhouseProfile>>();
@@ -92,11 +92,11 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
}
private async getCachedProfiles(
cleanedQueue: QueueItem<IClickhouseProfile>[]
cleanedQueue: QueueItem<IClickhouseProfile>[],
): Promise<(IClickhouseProfile | null)[]> {
const redisCache = getRedisCache();
const keys = cleanedQueue.map(
(item) => `profile:${item.event.project_id}:${item.event.id}`
(item) => `profile:${item.event.project_id}:${item.event.id}`,
);
const cachedProfiles = await redisCache.mget(...keys);
return cachedProfiles.map((profile) => {
@@ -109,7 +109,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
}
private async fetchDbProfiles(
cleanedQueue: QueueItem<IClickhouseProfile>[]
cleanedQueue: QueueItem<IClickhouseProfile>[],
): Promise<IClickhouseProfile[]> {
if (cleanedQueue.length === 0) {
return [];
@@ -122,21 +122,21 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
WHERE
(id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')})
ORDER BY
created_at DESC`
created_at DESC`,
);
}
private createProfileValues(
cleanedQueue: QueueItem<IClickhouseProfile>[],
redisProfiles: (IClickhouseProfile | null)[],
dbProfiles: IClickhouseProfile[]
dbProfiles: IClickhouseProfile[],
): IClickhouseProfile[] {
return cleanedQueue
.map((item, index) => {
const cachedProfile = redisProfiles[index];
const dbProfile = dbProfiles.find(
(p) =>
p.id === item.event.id && p.project_id === item.event.project_id
p.id === item.event.id && p.project_id === item.event.project_id,
);
const profile = cachedProfile || dbProfile;
@@ -150,7 +150,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
},
{
ignore: ['created_at'],
}
},
)
) {
console.log('Ignoring profile', item.event.id);
@@ -182,14 +182,14 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
multi.setex(
`profile:${value.project_id}:${value.id}`,
60 * 30, // 30 minutes
JSON.stringify(value)
JSON.stringify(value),
);
});
await multi.exec();
}
private async insertIntoClickhouse(
values: IClickhouseProfile[]
values: IClickhouseProfile[],
): Promise<void> {
await ch.insert({
table: TABLE_NAMES.profiles,
@@ -199,7 +199,7 @@ export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
}
public findMany: FindMany<IClickhouseProfile, IServiceProfile> = async (
callback
callback,
) => {
return this.getQueue(-1)
.then((queue) => {