improve(buffer): better clean up

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-03 21:50:35 +01:00
parent de1a4faf28
commit be83b484bc
5 changed files with 129 additions and 60 deletions

View File

@@ -1,7 +1,7 @@
import client from 'prom-client';
import { botBuffer, db, eventBuffer, profileBuffer } from '@openpanel/db';
import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
const Registry = client.Registry;
@@ -66,31 +66,47 @@ queues.forEach((queue) => {
);
});
// Buffer
const buffers = ['events_v2', 'profiles', 'events_bots'];
register.registerMetric(
new client.Gauge({
name: `buffer_${eventBuffer.name}_count`,
help: 'Number of unprocessed events',
async collect() {
const metric = await db.eventBuffer.count({
where: {
processedAt: null,
},
});
this.set(metric);
},
}),
);
buffers.forEach((buffer) => {
register.registerMetric(
new client.Gauge({
name: `buffer_${buffer}_count`,
help: 'Number of users in the users array',
async collect() {
const metric = await getRedisCache().llen(`op:buffer:${buffer}`);
this.set(metric);
},
}),
);
register.registerMetric(
new client.Gauge({
name: `buffer_${profileBuffer.name}_count`,
help: 'Number of unprocessed profiles',
async collect() {
const metric = await db.profileBuffer.count({
where: {
processedAt: null,
},
});
this.set(metric);
},
}),
);
register.registerMetric(
new client.Gauge({
name: `buffer_${buffer}_stalled_count`,
help: 'Number of users in the users array',
async collect() {
const metric = await getRedisCache().llen(
`op:buffer:${buffer}:stalled`,
);
this.set(metric);
},
}),
);
});
register.registerMetric(
new client.Gauge({
name: `buffer_${botBuffer.name}_count`,
help: 'Number of unprocessed bot events',
async collect() {
const metric = await db.botEventBuffer.count({
where: {
processedAt: null,
},
});
this.set(metric);
},
}),
);

View File

@@ -9,8 +9,9 @@ import type { IClickhouseBotEvent } from '../services/event.service';
import { BaseBuffer } from './base-buffer';
export class BotBuffer extends BaseBuffer {
private daysToKeep = 1;
private batchSize = 500;
private batchSize = process.env.BOT_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.BOT_BUFFER_BATCH_SIZE, 10)
: 1000;
constructor() {
super({
@@ -87,7 +88,7 @@ export class BotBuffer extends BaseBuffer {
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
interval: 60 * 5, // 5 minutes
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
@@ -97,13 +98,10 @@ export class BotBuffer extends BaseBuffer {
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const deleted = await db.botEventBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
not: null,
},
},
});

View File

@@ -13,6 +13,7 @@ export type FindMany<T, R = unknown> = (
) => Promise<R[]>;
export class RedisBuffer<T> {
public name: string;
protected prefix = 'op:buffer';
protected bufferKey: string;
private lockKey: string;
@@ -20,6 +21,7 @@ export class RedisBuffer<T> {
protected logger: ILogger;
constructor(bufferName: string, maxBufferSize: number | null) {
this.name = bufferName;
this.bufferKey = bufferName;
this.lockKey = `lock:${bufferName}`;
this.maxBufferSize = maxBufferSize;

View File

@@ -13,7 +13,9 @@ import {
import { BaseBuffer } from './base-buffer';
export class EventBuffer extends BaseBuffer {
private daysToKeep = 3;
private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP
? Number.parseInt(process.env.EVENT_BUFFER_DAYS_TO_KEEP, 10)
: 3;
private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 2000;
@@ -26,7 +28,7 @@ export class EventBuffer extends BaseBuffer {
name: 'event',
onFlush: async () => {
await this.processBuffer();
await this.cleanup();
await this.tryCleanup();
},
});
}
@@ -205,7 +207,7 @@ export class EventBuffer extends BaseBuffer {
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
interval: 60 * 5, // 5 minutes
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
@@ -215,18 +217,26 @@ export class EventBuffer extends BaseBuffer {
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const olderThan = new Date();
olderThan.setDate(olderThan.getDate() - this.daysToKeep);
const deleted = await db.eventBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
},
},
});
const deleted = await db.$executeRaw`
DELETE FROM event_buffer
WHERE
-- 1) if the event has been processed
-- and session is completed or has no session
(
"processedAt" IS NOT NULL
AND (
"sessionId" IN (SELECT "sessionId" FROM event_buffer WHERE name = 'session_end')
OR "sessionId" IS NULL
)
)
-- 2) if the event is stalled for X days
OR "createdAt" < ${olderThan}
`;
this.logger.info('Cleaned up old events', { deleted: deleted.count });
this.logger.info('Cleaned up old events', { deleted });
}
public async getLastScreenView({

View File

@@ -10,12 +10,14 @@ import type { IClickhouseProfile } from '../services/profile.service';
import { BaseBuffer } from './base-buffer';
export class ProfileBuffer extends BaseBuffer {
private daysToKeep = 30;
private batchSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP
? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10)
: 7;
private batchSize = process.env.PROFILE_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
: 2000;
private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
: 1000;
constructor() {
@@ -28,9 +30,50 @@ export class ProfileBuffer extends BaseBuffer {
});
}
private generateChecksum(profile: IClickhouseProfile): string {
private sortObjectKeys(obj: any): any {
// Cache typeof check result
const type = typeof obj;
// Fast-path for primitives
if (obj === null || type !== 'object') {
return obj;
}
// Fast-path for arrays - process values only
if (Array.isArray(obj)) {
// Only map if contains objects
return obj.some((item) => item && typeof item === 'object')
? obj.map((item) => this.sortObjectKeys(item))
: obj;
}
// Get and sort keys once
const sortedKeys = Object.keys(obj).sort();
const len = sortedKeys.length;
// Pre-allocate result object
const result: any = {};
// Single loop with cached length
for (let i = 0; i < len; i++) {
const key = sortedKeys[i]!;
const value = obj[key];
result[key] =
value && typeof value === 'object' ? this.sortObjectKeys(value) : value;
}
return result;
}
private stringify(profile: IClickhouseProfile): string {
const { created_at, ...rest } = profile;
return createHash('sha256').update(JSON.stringify(rest)).digest('hex');
const sorted = this.sortObjectKeys(rest);
return JSON.stringify(sorted);
}
private generateChecksum(profile: IClickhouseProfile): string {
const json = this.stringify(profile);
return createHash('sha256').update(json).digest('hex');
}
async add(profile: IClickhouseProfile) {
@@ -77,7 +120,7 @@ export class ProfileBuffer extends BaseBuffer {
}
// Update existing profile if its not processed yet
if (existingProfile && existingProfile.processedAt === null) {
if (existingProfile) {
await db.profileBuffer.update({
where: {
id: existingProfile.id,
@@ -86,7 +129,7 @@ export class ProfileBuffer extends BaseBuffer {
checksum: this.generateChecksum(mergedProfile),
payload: mergedProfile,
updatedAt: new Date(),
processedAt: null, // unsure this will get processed (race condition)
processedAt: null,
},
});
} else {
@@ -165,7 +208,7 @@ export class ProfileBuffer extends BaseBuffer {
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
interval: 60 * 60, // 1 hour
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
@@ -175,13 +218,13 @@ export class ProfileBuffer extends BaseBuffer {
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const olderThan = new Date();
olderThan.setDate(olderThan.getDate() - this.daysToKeep);
const deleted = await db.profileBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
lt: olderThan,
},
},
});