batching events

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-07-17 17:13:07 +02:00
committed by Carl-Gerhard Lindesvärd
parent 244aa3b0d3
commit 5e225b7ae6
58 changed files with 2204 additions and 583 deletions

View File

@@ -0,0 +1,137 @@
import type { Redis } from '@openpanel/redis';
export const DELETE = '__DELETE__';
export type QueueItem<T> = {
event: T;
index: number;
};
export type OnInsert<T> = (data: T) => unknown;
export type OnCompleted<T> =
| ((data: T[]) => Promise<unknown[]>)
| ((data: T[]) => unknown[]);
export type ProcessQueue<T> = (data: QueueItem<T>[]) => Promise<number[]>;
export type Find<T, R = unknown> = (
callback: (item: QueueItem<T>) => boolean
) => Promise<R | null>;
export type FindMany<T, R = unknown> = (
callback: (item: QueueItem<T>) => boolean
) => Promise<R[]>;
export abstract class RedisBuffer<T> {
// constructor
public prefix = 'op:buffer';
public table: string;
public batchSize?: number;
public redis: Redis;
// abstract methods
public abstract onInsert?: OnInsert<T>;
public abstract onCompleted?: OnCompleted<T>;
public abstract processQueue: ProcessQueue<T>;
public abstract find: Find<T, unknown>;
public abstract findMany: FindMany<T, unknown>;
constructor(options: { table: string; redis: Redis; batchSize?: number }) {
this.table = options.table;
this.redis = options.redis;
this.batchSize = options.batchSize;
}
public getKey(name?: string) {
const key = this.prefix + ':' + this.table;
if (name) {
return `${key}:${name}`;
}
return key;
}
public async insert(value: T) {
this.onInsert?.(value);
await this.redis.rpush(this.getKey(), JSON.stringify(value));
const length = await this.redis.llen(this.getKey());
if (this.batchSize && length >= this.batchSize) {
this.flush();
}
}
public async flush() {
try {
const queue = await this.getQueue(this.batchSize || -1);
if (queue.length === 0) {
return {
count: 0,
data: [],
};
}
try {
const indexes = await this.processQueue(queue);
await this.deleteIndexes(indexes);
const data = indexes
.map((index) => queue[index]?.event)
.filter((event): event is T => event !== null);
if (this.onCompleted) {
const res = await this.onCompleted(data);
return {
count: res.length,
data: res,
};
}
return {
count: indexes.length,
data: indexes,
};
} catch (e) {
console.log(
`[${this.getKey()}] Failed to processQueue while flushing:`,
e
);
const timestamp = new Date().getTime();
await this.redis.hset(this.getKey(`failed:${timestamp}`), {
error: e instanceof Error ? e.message : 'Unknown error',
data: JSON.stringify(queue.map((item) => item.event)),
retries: 0,
});
}
} catch (e) {
console.log(`[${this.getKey()}] Failed to getQueue while flushing:`, e);
}
}
public async deleteIndexes(indexes: number[]) {
const multi = this.redis.multi();
indexes.forEach((index) => {
multi.lset(this.getKey(), index, DELETE);
});
multi.lrem(this.getKey(), 0, DELETE);
await multi.exec();
}
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
const queue = await this.redis.lrange(this.getKey(), 0, limit);
return queue
.map((item, index) => ({
event: this.transformQueueItem(item),
index,
}))
.filter((item): item is QueueItem<T> => item.event !== null);
}
private transformQueueItem(item: string): T | null {
try {
return JSON.parse(item);
} catch (e) {
return null;
}
}
}

View File

@@ -0,0 +1,212 @@
import { groupBy } from 'ramda';
import SuperJSON from 'superjson';
import { deepMergeObjects } from '@openpanel/common';
import { redis, redisPub } from '@openpanel/redis';
import { ch } from '../clickhouse-client';
import { transformEvent } from '../services/event.service';
import type {
IClickhouseEvent,
IServiceCreateEventPayload,
} from '../services/event.service';
import type {
Find,
FindMany,
OnCompleted,
OnInsert,
ProcessQueue,
QueueItem,
} from './buffer';
import { RedisBuffer } from './buffer';
const sortOldestFirst = (
a: QueueItem<IClickhouseEvent>,
b: QueueItem<IClickhouseEvent>
) =>
new Date(a.event.created_at).getTime() -
new Date(b.event.created_at).getTime();
export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
constructor() {
super({
table: 'events',
redis,
});
}
public onInsert?: OnInsert<IClickhouseEvent> | undefined = (event) => {
redisPub.publish(
'event:received',
SuperJSON.stringify(transformEvent(event))
);
this.redis.setex(
`live:event:${event.project_id}:${event.profile_id}`,
'',
60 * 5
);
};
public onCompleted?: OnCompleted<IClickhouseEvent> | undefined = (
savedEvents
) => {
for (const event of savedEvents) {
redisPub.publish(
'event:saved',
SuperJSON.stringify(transformEvent(event))
);
}
return savedEvents.map((event) => event.id);
};
public processQueue: ProcessQueue<IClickhouseEvent> = async (queue) => {
const itemsToClickhouse = new Set<QueueItem<IClickhouseEvent>>();
const itemsToStalled = new Set<QueueItem<IClickhouseEvent>>();
// Sort data by created_at
// oldest first
queue.sort(sortOldestFirst);
// All events thats not a screen_view can be sent to clickhouse
// We only need screen_views since we want to calculate the duration of each screen
// To do this we need a minimum of 2 screen_views
queue
.filter(
(item) =>
item.event.name !== 'screen_view' || item.event.device === 'server'
)
.forEach((item) => {
// Find the last event with data and merge it with the current event
// We use profile_id here since this property can be set from backend as well
const lastEventWithData = queue
.slice(0, item.index)
.findLast((lastEvent) => {
return (
lastEvent.event.project_id === item.event.project_id &&
lastEvent.event.profile_id === item.event.profile_id &&
lastEvent.event.path !== ''
);
});
const event = deepMergeObjects<IClickhouseEvent>(
lastEventWithData?.event || {},
item.event
);
if (lastEventWithData) {
event.properties.__properties_from = lastEventWithData.event.id;
}
return itemsToClickhouse.add({
...item,
event,
});
});
// Group screen_view events by session_id
const grouped = groupBy(
(item) => item.event.session_id,
queue.filter(
(item) =>
item.event.name === 'screen_view' && item.event.device !== 'server'
)
);
// Iterate over each group
for (const [sessionId, screenViews] of Object.entries(grouped)) {
if (sessionId === '' || !sessionId) {
continue;
}
// If there is only one screen_view event we can send it back to redis since we can't calculate the duration
const hasSessionEnd = queue.find(
(item) =>
item.event.name === 'session_end' &&
item.event.session_id === sessionId
);
screenViews
?.slice()
.sort(sortOldestFirst)
.forEach((item, index) => {
const nextScreenView = screenViews[index + 1];
// if nextScreenView does not exists we can't calculate the duration (last event in session)
if (nextScreenView) {
const duration =
new Date(nextScreenView.event.created_at).getTime() -
new Date(item.event.created_at).getTime();
const event = {
...item.event,
duration,
};
event.properties.__duration_from = nextScreenView.event.id;
itemsToClickhouse.add({
...item,
event,
});
// push last event in session if we have a session_end event
} else if (hasSessionEnd) {
itemsToClickhouse.add(item);
}
});
} // for of end
// Check if we have any events that has been in the queue for more than 24 hour
// This should not theoretically happen but if it does we should move them to stalled
queue.forEach((item) => {
if (
!itemsToClickhouse.has(item) &&
new Date(item.event.created_at).getTime() <
new Date().getTime() - 1000 * 60 * 60 * 24
) {
itemsToStalled.add(item);
}
});
if (itemsToStalled.size > 0) {
const multi = this.redis.multi();
for (const item of itemsToStalled) {
multi.rpush(this.getKey('stalled'), JSON.stringify(item.event));
}
await multi.exec();
}
await ch.insert({
table: 'events',
values: Array.from(itemsToClickhouse).map((item) => item.event),
format: 'JSONEachRow',
});
return [
...Array.from(itemsToClickhouse).map((item) => item.index),
...Array.from(itemsToStalled).map((item) => item.index),
];
};
public findMany: FindMany<IClickhouseEvent, IServiceCreateEventPayload> =
async (callback) => {
return this.getQueue(-1)
.then((queue) => {
return queue
.filter(callback)
.map((item) => transformEvent(item.event));
})
.catch(() => {
return [];
});
};
public find: Find<IClickhouseEvent, IServiceCreateEventPayload> = async (
callback
) => {
return this.getQueue(-1)
.then((queue) => {
const match = queue.find(callback);
return match ? transformEvent(match.event) : null;
})
.catch(() => {
return null;
});
};
}

View File

@@ -0,0 +1,5 @@
import { EventBuffer } from './event-buffer';
import { ProfileBuffer } from './profile-buffer';
export const eventBuffer = new EventBuffer();
export const profileBuffer = new ProfileBuffer();

View File

@@ -0,0 +1,114 @@
import { mergeDeepRight } from 'ramda';
import { toDots } from '@openpanel/common';
import { redis } from '@openpanel/redis';
import { ch, chQuery } from '../clickhouse-client';
import type {
IClickhouseProfile,
IServiceProfile,
} from '../services/profile.service';
import { transformProfile } from '../services/profile.service';
import type {
Find,
FindMany,
OnCompleted,
OnInsert,
ProcessQueue,
QueueItem,
} from './buffer';
import { RedisBuffer } from './buffer';
export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
constructor() {
super({
redis,
table: 'profiles',
batchSize: 100,
});
}
public onInsert?: OnInsert<IClickhouseProfile> | undefined;
public onCompleted?: OnCompleted<IClickhouseProfile> | undefined;
public processQueue: ProcessQueue<IClickhouseProfile> = async (queue) => {
const itemsToClickhouse = new Map<string, QueueItem<IClickhouseProfile>>();
// Combine all writes to the same profile
queue.forEach((item) => {
const key = item.event.project_id + item.event.id;
const existing = itemsToClickhouse.get(key);
itemsToClickhouse.set(
item.event.project_id + item.event.id,
mergeDeepRight(existing ?? {}, item)
);
});
const cleanedQueue = Array.from(itemsToClickhouse.values());
const profiles = await chQuery<IClickhouseProfile>(
`SELECT
*
FROM profiles
WHERE
(id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')})
ORDER BY
created_at DESC`
);
await ch.insert({
table: 'profiles',
values: cleanedQueue.map((item) => {
const profile = profiles.find(
(p) =>
p.id === item.event.id && p.project_id === item.event.project_id
);
return {
id: item.event.id,
first_name: item.event.first_name ?? profile?.first_name ?? '',
last_name: item.event.last_name ?? profile?.last_name ?? '',
email: item.event.email ?? profile?.email ?? '',
avatar: item.event.avatar ?? profile?.avatar ?? '',
properties: toDots({
...(profile?.properties ?? {}),
...(item.event.properties ?? {}),
}),
project_id: item.event.project_id ?? profile?.project_id ?? '',
created_at: new Date(),
is_external: item.event.is_external,
};
}),
clickhouse_settings: {
date_time_input_format: 'best_effort',
},
format: 'JSONEachRow',
});
return queue.map((item) => item.index);
};
public findMany: FindMany<IClickhouseProfile, IServiceProfile> = async (
callback
) => {
return this.getQueue(-1)
.then((queue) => {
return queue
.filter(callback)
.map((item) => transformProfile(item.event));
})
.catch(() => {
return [];
});
};
public find: Find<IClickhouseProfile, IServiceProfile> = async (callback) => {
return this.getQueue(-1)
.then((queue) => {
const match = queue.find(callback);
return match ? transformProfile(match.event) : null;
})
.catch(() => {
return null;
});
};
}

View File

@@ -1,7 +1,7 @@
import type { ResponseJSON } from '@clickhouse/client';
import { createClient } from '@clickhouse/client';
export const ch = createClient({
export const originalCh = createClient({
url: process.env.CLICKHOUSE_URL,
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD,
@@ -9,6 +9,53 @@ export const ch = createClient({
max_open_connections: 10,
keep_alive: {
enabled: true,
idle_socket_ttl: 5000,
},
compression: {
request: true,
},
});
export const ch = new Proxy(originalCh, {
get(target, property, receiver) {
if (property === 'insert' || property === 'query') {
return async (...args: any[]) => {
try {
// First attempt
if (property in target) {
// @ts-expect-error
return await target[property](...args);
}
} catch (error: unknown) {
if (
error instanceof Error &&
error.message.includes('socket hang up')
) {
console.error(
`Caught socket hang up error on ${property.toString()}, retrying once.`
);
await new Promise((resolve) => setTimeout(resolve, 500));
try {
// Retry once
if (property in target) {
// @ts-expect-error
return await target[property](...args);
}
} catch (retryError) {
console.error(
`Retry failed for ${property.toString()}:`,
retryError
);
throw retryError; // Rethrow or handle as needed
}
} else {
// Handle other errors or rethrow them
throw error;
}
}
};
}
return Reflect.get(target, property, receiver);
},
});

View File

@@ -1,12 +1,12 @@
import { omit, uniq } from 'ramda';
import { escape } from 'sqlstring';
import superjson from 'superjson';
import { v4 as uuid } from 'uuid';
import { toDots } from '@openpanel/common';
import { redis, redisPub } from '@openpanel/redis';
import { redis } from '@openpanel/redis';
import type { IChartEventFilter } from '@openpanel/validation';
import { eventBuffer } from '../buffers';
import {
ch,
chQuery,
@@ -17,7 +17,7 @@ import type { EventMeta, Prisma } from '../prisma-client';
import { db } from '../prisma-client';
import { createSqlBuilder } from '../sql-builder';
import { getEventFiltersWhereClause } from './chart.service';
import { getProfileById, getProfiles, upsertProfile } from './profile.service';
import { getProfiles, upsertProfile } from './profile.service';
import type { IServiceProfile } from './profile.service';
export interface IClickhouseEvent {
@@ -226,17 +226,14 @@ export async function createEvent(
payload.profileId = payload.deviceId;
}
console.log(
`create event ${payload.name} for deviceId: ${payload.deviceId} profileId ${payload.profileId}`
`create event ${payload.name} for [deviceId]: ${payload.deviceId} [profileId]: ${payload.profileId} [projectId]: ${payload.projectId} [path]: ${payload.path}`
);
const exists = await getProfileById(payload.profileId, payload.projectId);
if (!exists && payload.profileId !== '') {
if (payload.profileId !== '') {
await upsertProfile({
id: String(payload.profileId),
isExternal: false,
isExternal: payload.profileId !== payload.deviceId,
projectId: payload.projectId,
firstName: '',
lastName: '',
properties: {
path: payload.path,
country: payload.country,
@@ -287,25 +284,9 @@ export async function createEvent(
referrer_type: payload.referrerType ?? '',
};
const res = await ch.insert({
table: 'events',
values: [event],
format: 'JSONEachRow',
clickhouse_settings: {
date_time_input_format: 'best_effort',
},
});
redisPub.publish('event', superjson.stringify(transformEvent(event)));
redis.set(
`live:event:${event.project_id}:${event.profile_id}`,
'',
'EX',
60 * 5
);
await eventBuffer.insert(event);
return {
...res,
document: event,
};
}
@@ -449,3 +430,27 @@ export function getConversionEventNames(projectId: string) {
},
});
}
export async function getLastScreenViewFromProfileId({
profileId,
projectId,
}: {
profileId: string;
projectId: string;
}) {
const eventInBuffer = await eventBuffer.find(
(item) => item.event.profile_id === profileId
);
if (eventInBuffer) {
return eventInBuffer;
}
const [eventInDb] = profileId
? await getEvents(
`SELECT * FROM events WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} AND created_at >= now() - INTERVAL 30 MINUTE ORDER BY created_at DESC LIMIT 1`
)
: [];
return eventInDb || null;
}

View File

@@ -1,9 +1,10 @@
import { escape } from 'sqlstring';
import { toDots, toObject } from '@openpanel/common';
import { toObject } from '@openpanel/common';
import type { IChartEventFilter } from '@openpanel/validation';
import { ch, chQuery } from '../clickhouse-client';
import { profileBuffer } from '../buffers';
import { chQuery, formatClickhouseDate } from '../clickhouse-client';
import { createSqlBuilder } from '../sql-builder';
export type IProfileMetrics = {
@@ -66,7 +67,10 @@ export async function getProfiles(ids: string[]) {
const data = await chQuery<IClickhouseProfile>(
`SELECT *
FROM profiles FINAL
WHERE id IN (${ids.map((id) => escape(id)).join(',')})
WHERE id IN (${ids
.map((id) => escape(id))
.filter(Boolean)
.join(',')})
`
);
@@ -172,31 +176,15 @@ export async function upsertProfile({
projectId,
isExternal,
}: IServiceUpsertProfile) {
const [profile] = await chQuery<IClickhouseProfile>(
`SELECT * FROM profiles WHERE id = ${escape(id)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
);
await ch.insert({
table: 'profiles',
format: 'JSONEachRow',
clickhouse_settings: {
date_time_input_format: 'best_effort',
},
values: [
{
id,
first_name: firstName ?? profile?.first_name ?? '',
last_name: lastName ?? profile?.last_name ?? '',
email: email ?? profile?.email ?? '',
avatar: avatar ?? profile?.avatar ?? '',
properties: toDots({
...(profile?.properties ?? {}),
...(properties ?? {}),
}),
project_id: projectId ?? profile?.project_id ?? '',
created_at: new Date(),
is_external: isExternal,
},
],
return profileBuffer.insert({
id,
first_name: firstName!,
last_name: lastName!,
email: email!,
avatar: avatar!,
properties: properties as Record<string, string | undefined>,
project_id: projectId,
created_at: formatClickhouseDate(new Date()),
is_external: isExternal,
});
}