improve(buffer): prep postgres buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-03 09:19:00 +01:00
parent a2b74a9b4d
commit ff2dca42f6
8 changed files with 214 additions and 191 deletions

View File

@@ -44,7 +44,7 @@ async function getCompleteSessionWithSessionStart({
sessionId: string;
logger: ILogger;
}): Promise<ReturnType<typeof getEvents>> {
const intervals = [6, 12, 24, 72];
const intervals = [1, 6, 12, 24, 72];
let intervalIndex = 0;
for (const hoursInterval of intervals) {
const events = await getCompleteSession({
@@ -76,6 +76,7 @@ export async function createSessionEnd(
const payload = job.data.payload;
// TODO: Get complete session from buffer to offload clickhouse
const [lastScreenView, eventsInDb] = await Promise.all([
eventBuffer.getLastScreenView({
projectId: payload.projectId,
@@ -96,19 +97,6 @@ export async function createSessionEnd(
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
events.map((event, index) => {
job.log(
[
`Index: ${index}`,
`Event: ${event.name}`,
`Created: ${event.createdAt.toISOString()}`,
`DeviceId: ${event.deviceId}`,
`Profile: ${event.profileId}`,
`Path: ${event.path}`,
].join('\n'),
);
});
const sessionDuration = events.reduce((acc, event) => {
return acc + event.duration;
}, 0);

View File

@@ -70,7 +70,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
projectId,
properties: omit(GLOBAL_PROPERTIES, {
...properties,
user_agent: userAgent,
__user_agent: userAgent,
__hash: hash,
__query: query,
}),

View File

@@ -0,0 +1,74 @@
import { generateSecureId } from '@openpanel/common/server/id';
import { type ILogger, createLogger } from '@openpanel/logger';
import { getRedisCache } from '@openpanel/redis';
export class BaseBuffer {
name: string;
logger: ILogger;
lockKey: string;
lockTimeout = 60;
onFlush: () => void;
constructor(options: {
name: string;
onFlush: () => Promise<void>;
}) {
this.logger = createLogger({ name: options.name });
this.name = options.name;
this.lockKey = `lock:${this.name}`;
this.onFlush = options.onFlush;
}
protected chunks<T>(items: T[], size: number) {
const chunks = [];
for (let i = 0; i < items.length; i += size) {
chunks.push(items.slice(i, i + size));
}
return chunks;
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const now = performance.now();
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...', {
lockId,
});
await this.onFlush();
} catch (error) {
this.logger.error('Failed to process buffer', {
error,
lockId,
});
} finally {
await this.releaseLock(lockId);
this.logger.info('Flush completed', {
elapsed: performance.now() - now,
lockId,
});
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.', { lockId });
}
}
}

View File

@@ -6,17 +6,20 @@ import { Prisma } from '@prisma/client';
import { TABLE_NAMES, ch } from '../clickhouse-client';
import { db } from '../prisma-client';
import type { IClickhouseBotEvent } from '../services/event.service';
import { BaseBuffer } from './base-buffer';
export class BotBuffer {
private name = 'bot';
private lockKey = `lock:${this.name}`;
private logger: ILogger;
private lockTimeout = 60;
export class BotBuffer extends BaseBuffer {
private daysToKeep = 1;
private batchSize = 500;
constructor() {
this.logger = createLogger({ name: this.name });
super({
name: 'bot',
onFlush: async () => {
await this.processBuffer();
await this.tryCleanup();
},
});
}
async add(event: IClickhouseBotEvent) {
@@ -44,43 +47,6 @@ export class BotBuffer {
}
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
const eventsToProcess = await db.botEventBuffer.findMany({
where: {

View File

@@ -10,17 +10,25 @@ import {
type IServiceEvent,
transformEvent,
} from '../services/event.service';
import { BaseBuffer } from './base-buffer';
export class EventBuffer {
private name = 'event';
private logger: Logger;
private lockKey = `lock:${this.name}`;
private lockTimeout = 60;
private daysToKeep = 2;
private batchSize = 1000;
export class EventBuffer extends BaseBuffer {
private daysToKeep = 3;
private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 2000;
private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 1000;
constructor() {
this.logger = createLogger({ name: this.name });
super({
name: 'event',
onFlush: async () => {
await this.processBuffer();
await this.cleanup();
},
});
}
async add(event: IClickhouseEvent) {
@@ -30,22 +38,23 @@ export class EventBuffer {
projectId: event.project_id,
eventId: event.id,
name: event.name,
profileId: event.profile_id,
sessionId: event.session_id,
profileId: event.profile_id || null,
sessionId: event.session_id || null,
payload: event,
},
});
// TODO: UNCOMMENT THIS!!!
// this.publishEvent('event:received', event);
// if (event.profile_id) {
// getRedisCache().set(
// `live:event:${event.project_id}:${event.profile_id}`,
// '',
// 'EX',
// 60 * 5,
// );
// }
if (!process.env.TEST_NEW_BUFFER) {
this.publishEvent('event:received', event);
if (event.profile_id) {
getRedisCache().set(
`live:event:${event.project_id}:${event.profile_id}`,
'',
'EX',
60 * 5,
);
}
}
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
if (error.code === 'P2002') {
@@ -70,49 +79,19 @@ export class EventBuffer {
}
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
let now = performance.now();
const timer: Record<string, number | undefined> = {
fetchUnprocessedEvents: undefined,
transformEvents: undefined,
insertToClickhouse: undefined,
markAsProcessed: undefined,
};
const eventsToProcess = await db.$queryRaw<IPrismaEventBuffer[]>`
WITH has_2_special AS (
WITH has_more_than_2_events AS (
SELECT "sessionId"
FROM event_buffer
WHERE "processedAt" IS NULL
AND name IN ('screen_view', 'session_start', 'session_end')
GROUP BY "sessionId"
HAVING COUNT(*) >= 2
)
@@ -120,30 +99,32 @@ export class EventBuffer {
FROM event_buffer e
WHERE e."processedAt" IS NULL
AND (
-- 1) if the event name is NOT in the special set
e.name NOT IN ('screen_view', 'session_start', 'session_end')
-- 1) all events except screen_view
e.name != 'screen_view'
OR
-- 2) if the event name IS in the special set AND
-- the session has >= 2 such unprocessed events
(
e.name IN ('screen_view', 'session_start', 'session_end')
AND e."sessionId" IN (SELECT "sessionId" FROM has_2_special)
)
-- 2) if the session has >= 2 such unprocessed events
e."sessionId" IN (SELECT "sessionId" FROM has_more_than_2_events)
)
ORDER BY e."createdAt" ASC -- or e.id, whichever "oldest first" logic you use
ORDER BY e."createdAt" ASC
LIMIT ${this.batchSize}
`;
timer.fetchUnprocessedEvents = performance.now() - now;
now = performance.now();
const toInsert = eventsToProcess.reduce<IPrismaEventBuffer[]>(
(acc, event, index, list) => {
// SCREEN VIEW
if (event.name === 'screen_view') {
const nextScreenView = list.find(
(e, eIndex) =>
(e.name === 'screen_view' || e.name === 'session_end') &&
e.sessionId === event.sessionId &&
eIndex > index,
);
const nextScreenView = list
.slice(index + 1)
.find(
(e) =>
(e.name === 'screen_view' || e.name === 'session_end') &&
e.sessionId === event.sessionId,
);
// Calculate duration
if (nextScreenView && nextScreenView.name === 'screen_view') {
event.payload.duration =
new Date(nextScreenView.createdAt).getTime() -
@@ -155,6 +136,20 @@ export class EventBuffer {
if (!nextScreenView) {
return acc;
}
} else {
// OTHER EVENTS
const currentScreenView = list
.slice(0, index)
.findLast(
(e) =>
e.name === 'screen_view' && e.sessionId === event.sessionId,
);
if (currentScreenView) {
// Get path related info from the current screen view
event.payload.path = currentScreenView.payload.path;
event.payload.origin = currentScreenView.payload.origin;
}
}
acc.push(event);
@@ -164,17 +159,29 @@ export class EventBuffer {
[],
);
timer.transformEvents = performance.now() - now;
now = performance.now();
if (toInsert.length > 0) {
await ch.insert({
table: 'events',
values: toInsert.map((e) => e.payload),
format: 'JSONEachRow',
});
const events = toInsert.map((e) => e.payload);
for (const chunk of this.chunks(events, this.chunkSize)) {
await ch.insert({
table: 'events',
values: chunk,
format: 'JSONEachRow',
});
}
timer.insertToClickhouse = performance.now() - now;
now = performance.now();
for (const event of toInsert) {
this.publishEvent('event:saved', event.payload);
}
timer.markAsProcessed = performance.now() - now;
now = performance.now();
await db.eventBuffer.updateMany({
where: {
id: {
@@ -186,8 +193,11 @@ export class EventBuffer {
},
});
timer.markAsProcessed = performance.now() - now;
this.logger.info('Processed events', {
count: toInsert.length,
timer,
});
}
}

View File

@@ -1,7 +1,16 @@
import { BotBuffer } from './bot-buffer';
import { BotBuffer as NewBotBuffer } from './bot-buffer-psql';
import { EventBuffer } from './event-buffer';
import { EventBuffer as NewEventBuffer } from './event-buffer-psql';
import { ProfileBuffer } from './profile-buffer';
import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql';
export const eventBuffer = new EventBuffer();
export const profileBuffer = new ProfileBuffer();
export const botBuffer = new BotBuffer();
export const eventBuffer = process.env.USE_NEW_BUFFER
? new NewEventBuffer()
: new EventBuffer();
export const profileBuffer = process.env.USE_NEW_BUFFER
? new NewProfileBuffer()
: new ProfileBuffer();
export const botBuffer = process.env.USE_NEW_BUFFER
? new NewBotBuffer()
: new BotBuffer();

View File

@@ -7,16 +7,25 @@ import { mergeDeepRight } from 'ramda';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
import { db } from '../prisma-client';
import type { IClickhouseProfile } from '../services/profile.service';
import { BaseBuffer } from './base-buffer';
export class ProfileBuffer {
private name = 'profile';
private logger: Logger;
private lockKey = `lock:${this.name}`;
private lockTimeout = 60;
export class ProfileBuffer extends BaseBuffer {
private daysToKeep = 30;
private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 2000;
private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 1000;
constructor() {
this.logger = createLogger({ name: this.name });
super({
name: 'profile',
onFlush: async () => {
await this.processBuffer();
await this.tryCleanup();
},
});
}
private generateChecksum(profile: IClickhouseProfile): string {
@@ -27,7 +36,6 @@ export class ProfileBuffer {
async add(profile: IClickhouseProfile) {
try {
const checksum = this.generateChecksum(profile);
// Check if we have this exact profile in buffer
const existingProfile = await db.profileBuffer.findFirst({
where: {
@@ -75,8 +83,10 @@ export class ProfileBuffer {
id: existingProfile.id,
},
data: {
checksum: this.generateChecksum(mergedProfile),
payload: mergedProfile,
updatedAt: new Date(),
processedAt: null, // unsure this will get processed (race condition)
},
});
} else {
@@ -110,43 +120,6 @@ export class ProfileBuffer {
return result[0] || null;
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
const profilesToProcess = await db.profileBuffer.findMany({
where: {
@@ -155,6 +128,7 @@ export class ProfileBuffer {
orderBy: {
createdAt: 'asc',
},
take: this.batchSize,
});
if (profilesToProcess.length > 0) {
@@ -163,11 +137,13 @@ export class ProfileBuffer {
return profile;
});
await ch.insert({
table: TABLE_NAMES.profiles,
values: toInsert,
format: 'JSONEachRow',
});
for (const chunk of this.chunks(profilesToProcess, this.chunkSize)) {
await ch.insert({
table: TABLE_NAMES.profiles,
values: chunk,
format: 'JSONEachRow',
});
}
await db.profileBuffer.updateMany({
where: {

View File

@@ -273,11 +273,11 @@ export async function getEvents(
}
export async function createEvent(payload: IServiceCreateEventPayload) {
if (!payload.profileId) {
if (!payload.profileId && payload.deviceId) {
payload.profileId = payload.deviceId;
}
if (payload.profileId !== '') {
if (payload.profileId) {
await upsertProfile({
id: String(payload.profileId),
isExternal: payload.profileId !== payload.deviceId,
@@ -310,7 +310,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
profile_id: payload.profileId ? String(payload.profileId) : '',
project_id: payload.projectId,
session_id: payload.sessionId,
properties: toDots(omit(['_path'], payload.properties)),
properties: toDots(payload.properties),
path: payload.path ?? '',
origin: payload.origin ?? '',
created_at: formatClickhouseDate(payload.createdAt),