feature(queue): use postgres instead of redis for buffer

* wip(buffer): initial implementation of psql buffer

* wip(buffer): add both profile and bots buffer
This commit is contained in:
Carl-Gerhard Lindesvärd
2025-01-29 15:17:54 +00:00
committed by Carl-Gerhard Lindesvärd
parent 2b5b8ce446
commit 71bf22af51
16 changed files with 19713 additions and 18747 deletions

View File

@@ -0,0 +1,147 @@
import { generateSecureId } from '@openpanel/common/server/id';
import { type ILogger, createLogger } from '@openpanel/logger';
import { getRedisCache, runEvery } from '@openpanel/redis';
import { Prisma } from '@prisma/client';
import { TABLE_NAMES, ch } from '../clickhouse-client';
import { db } from '../prisma-client';
import type { IClickhouseBotEvent } from '../services/event.service';
export class BotBuffer {
private name = 'bot';
private lockKey = `lock:${this.name}`;
private logger: ILogger;
private lockTimeout = 60;
private daysToKeep = 1;
private batchSize = 500;
constructor() {
this.logger = createLogger({ name: this.name });
}
async add(event: IClickhouseBotEvent) {
try {
await db.botEventBuffer.create({
data: {
projectId: event.project_id,
eventId: event.id,
payload: event,
},
});
// Check if we have enough unprocessed events to trigger a flush
const unprocessedCount = await db.botEventBuffer.count({
where: {
processedAt: null,
},
});
if (unprocessedCount >= this.batchSize) {
await this.tryFlush();
}
} catch (error) {
this.logger.error('Failed to add bot event', { error });
}
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
const eventsToProcess = await db.botEventBuffer.findMany({
where: {
processedAt: null,
},
orderBy: {
createdAt: 'asc',
},
take: this.batchSize,
});
if (eventsToProcess.length > 0) {
const toInsert = eventsToProcess.map((e) => e.payload);
await ch.insert({
table: TABLE_NAMES.events_bots,
values: toInsert,
format: 'JSONEachRow',
});
await db.botEventBuffer.updateMany({
where: {
id: {
in: eventsToProcess.map((e) => e.id),
},
},
data: {
processedAt: new Date(),
},
});
this.logger.info('Processed bot events', {
count: toInsert.length,
});
}
}
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
} catch (error) {
this.logger.error('Failed to run cleanup', { error });
}
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const deleted = await db.botEventBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
},
},
});
this.logger.info('Cleaned up old bot events', { deleted: deleted.count });
}
}

View File

@@ -0,0 +1,278 @@
import { setSuperJson } from '@openpanel/common';
import { generateSecureId } from '@openpanel/common/server/id';
import { type ILogger as Logger, createLogger } from '@openpanel/logger';
import { getRedisCache, getRedisPub, runEvery } from '@openpanel/redis';
import { Prisma } from '@prisma/client';
import { ch } from '../clickhouse-client';
import { db } from '../prisma-client';
import {
type IClickhouseEvent,
type IServiceEvent,
transformEvent,
} from '../services/event.service';
export class EventBuffer {
private name = 'event';
private logger: Logger;
private lockKey = `lock:${this.name}`;
private lockTimeout = 60;
private daysToKeep = 2;
constructor() {
this.logger = createLogger({ name: this.name });
}
async add(event: IClickhouseEvent) {
try {
await db.eventBuffer.create({
data: {
projectId: event.project_id,
eventId: event.id,
name: event.name,
profileId: event.profile_id,
sessionId: event.session_id,
payload: event,
},
});
this.publishEvent('event:received', event);
if (event.profile_id) {
getRedisCache().set(
`live:event:${event.project_id}:${event.profile_id}`,
'',
'EX',
60 * 5,
);
}
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
if (error.code === 'P2002') {
this.logger.warn('Duplicate event ignored', { eventId: event.id });
return;
}
}
this.logger.error('Failed to add event', { error });
}
}
private async publishEvent(channel: string, event: IClickhouseEvent) {
try {
await getRedisPub().publish(
channel,
setSuperJson(
transformEvent(event) as unknown as Record<string, unknown>,
),
);
} catch (error) {
this.logger.warn('Failed to publish event', { error });
}
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
const eventsToProcess = await db.$transaction(async (trx) => {
// Process all screen_views that have a next event
const processableViews = await trx.$queryRaw<
Array<{
id: string;
payload: IClickhouseEvent;
next_event_time: Date;
}>
>`
WITH NextEvents AS (
SELECT
id,
payload,
LEAD("createdAt") OVER (
PARTITION BY "sessionId"
ORDER BY "createdAt"
) as next_event_time
FROM event_buffer
WHERE "name" = 'screen_view'
AND "processedAt" IS NULL
)
SELECT *
FROM NextEvents
WHERE next_event_time IS NOT NULL
`;
// Find screen_views that are last in their session with session_end
const lastViews = await trx.$queryRaw<
Array<{
id: string;
payload: IClickhouseEvent;
}>
>`
WITH LastViews AS (
SELECT e.id, e.payload,
EXISTS (
SELECT 1
FROM event_buffer se
WHERE se."name" = 'session_end'
AND se."sessionId" = e."sessionId"
AND se."createdAt" > e."createdAt"
) as has_session_end
FROM event_buffer e
WHERE e."name" = 'screen_view'
AND e."processedAt" IS NULL
AND NOT EXISTS (
SELECT 1
FROM event_buffer next
WHERE next."sessionId" = e."sessionId"
AND next."name" = 'screen_view'
AND next."createdAt" > e."createdAt"
)
)
SELECT * FROM LastViews
WHERE has_session_end = true
`;
// Get all other events
const regularEvents = await trx.eventBuffer.findMany({
where: {
processedAt: null,
name: { not: 'screen_view' },
},
orderBy: { createdAt: 'asc' },
});
return {
processableViews,
lastViews,
regularEvents,
};
});
const toInsert = [
...eventsToProcess.processableViews.map((view) => ({
...view.payload,
duration:
new Date(view.next_event_time).getTime() -
new Date(view.payload.created_at).getTime(),
})),
...eventsToProcess.lastViews.map((v) => v.payload),
...eventsToProcess.regularEvents.map((e) => e.payload),
];
if (toInsert.length > 0) {
await ch.insert({
table: 'events',
values: toInsert,
format: 'JSONEachRow',
});
for (const event of toInsert) {
this.publishEvent('event:saved', event);
}
await db.eventBuffer.updateMany({
where: {
id: {
in: [
...eventsToProcess.processableViews.map((v) => v.id),
...eventsToProcess.lastViews.map((v) => v.id),
...eventsToProcess.regularEvents.map((e) => e.id),
],
},
},
data: {
processedAt: new Date(),
},
});
this.logger.info('Processed events', {
count: toInsert.length,
screenViews:
eventsToProcess.processableViews.length +
eventsToProcess.lastViews.length,
regularEvents: eventsToProcess.regularEvents.length,
});
}
}
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
} catch (error) {
this.logger.error('Failed to run cleanup', { error });
}
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const deleted = await db.eventBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
},
},
});
this.logger.info('Cleaned up old events', { deleted: deleted.count });
}
public async getLastScreenView({
projectId,
profileId,
}: {
projectId: string;
profileId: string;
}): Promise<IServiceEvent | null> {
const event = await db.eventBuffer.findFirst({
where: {
projectId,
profileId,
name: 'screen_view',
},
orderBy: { createdAt: 'desc' },
});
if (event) {
return transformEvent(event.payload);
}
return null;
}
}

View File

@@ -1,6 +1,6 @@
import { BotBuffer } from './bot-buffer';
import { EventBuffer } from './event-buffer';
import { ProfileBuffer } from './profile-buffer';
import { BotBuffer } from './bot-buffer-psql';
import { EventBuffer } from './event-buffer-psql';
import { ProfileBuffer } from './profile-buffer-psql';
export const eventBuffer = new EventBuffer();
export const profileBuffer = new ProfileBuffer();

View File

@@ -0,0 +1,215 @@
import { createHash } from 'node:crypto';
import { generateSecureId } from '@openpanel/common/server/id';
import { type ILogger as Logger, createLogger } from '@openpanel/logger';
import { getRedisCache, runEvery } from '@openpanel/redis';
import { mergeDeepRight } from 'ramda';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
import { db } from '../prisma-client';
import type { IClickhouseProfile } from '../services/profile.service';
export class ProfileBuffer {
private name = 'profile';
private logger: Logger;
private lockKey = `lock:${this.name}`;
private lockTimeout = 60;
private daysToKeep = 30;
constructor() {
this.logger = createLogger({ name: this.name });
}
private generateChecksum(profile: IClickhouseProfile): string {
const { created_at, ...rest } = profile;
return createHash('sha256').update(JSON.stringify(rest)).digest('hex');
}
async add(profile: IClickhouseProfile) {
try {
const checksum = this.generateChecksum(profile);
// Check if we have this exact profile in buffer
const existingProfile = await db.profileBuffer.findFirst({
where: {
projectId: profile.project_id,
profileId: profile.id,
},
orderBy: {
createdAt: 'desc',
},
});
// Last item in buffer is the same as the new profile
if (existingProfile?.checksum === checksum) {
this.logger.debug('Duplicate profile ignored', {
profileId: profile.id,
});
return;
}
let mergedProfile = profile;
if (!existingProfile) {
this.logger.debug('No profile in buffer, checking Clickhouse', {
profileId: profile.id,
});
// If not in buffer, check Clickhouse
const clickhouseProfile = await this.fetchFromClickhouse(profile);
if (clickhouseProfile) {
this.logger.debug('Clickhouse profile found, merging', {
profileId: profile.id,
});
mergedProfile = mergeDeepRight(clickhouseProfile, profile);
}
} else if (existingProfile.payload) {
this.logger.debug('Profile in buffer is different, merging', {
profileId: profile.id,
});
mergedProfile = mergeDeepRight(existingProfile.payload, profile);
}
// Update existing profile if its not processed yet
if (existingProfile && existingProfile.processedAt === null) {
await db.profileBuffer.update({
where: {
id: existingProfile.id,
},
data: {
payload: mergedProfile,
updatedAt: new Date(),
},
});
} else {
// Create new profile
await db.profileBuffer.create({
data: {
projectId: profile.project_id,
profileId: profile.id,
checksum,
payload: mergedProfile,
},
});
}
} catch (error) {
this.logger.error('Failed to add profile', { error });
}
}
private async fetchFromClickhouse(
profile: IClickhouseProfile,
): Promise<IClickhouseProfile | null> {
const result = await chQuery<IClickhouseProfile>(
`SELECT *
FROM ${TABLE_NAMES.profiles}
WHERE project_id = '${profile.project_id}'
AND id = '${profile.id}'
ORDER BY created_at DESC
LIMIT 1`,
);
return result[0] || null;
}
private async releaseLock(lockId: string): Promise<void> {
this.logger.debug('Releasing lock...');
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await getRedisCache().eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
lockId,
'EX',
this.lockTimeout,
'NX',
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...');
await this.processBuffer();
await this.tryCleanup();
} catch (error) {
this.logger.error('Failed to process buffer', { error });
} finally {
await this.releaseLock(lockId);
}
} else {
this.logger.warn('Failed to acquire lock. Skipping flush.');
}
}
async processBuffer() {
const profilesToProcess = await db.profileBuffer.findMany({
where: {
processedAt: null,
},
orderBy: {
createdAt: 'asc',
},
});
if (profilesToProcess.length > 0) {
const toInsert = profilesToProcess.map((p) => {
const profile = p.payload;
return profile;
});
await ch.insert({
table: TABLE_NAMES.profiles,
values: toInsert,
format: 'JSONEachRow',
});
await db.profileBuffer.updateMany({
where: {
id: {
in: profilesToProcess.map((p) => p.id),
},
},
data: {
processedAt: new Date(),
},
});
this.logger.info('Processed profiles', {
count: toInsert.length,
});
}
}
async tryCleanup() {
try {
await runEvery({
interval: 1000 * 60 * 60 * 24,
fn: this.cleanup.bind(this),
key: `${this.name}-cleanup`,
});
} catch (error) {
this.logger.error('Failed to run cleanup', { error });
}
}
async cleanup() {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - this.daysToKeep);
const deleted = await db.profileBuffer.deleteMany({
where: {
processedAt: {
lt: thirtyDaysAgo,
},
},
});
this.logger.info('Cleaned up old profiles', { deleted: deleted.count });
}
}

View File

@@ -3,7 +3,12 @@ import type {
INotificationRuleConfig,
IProjectFilters,
} from '@openpanel/validation';
import type {
IClickhouseBotEvent,
IClickhouseEvent,
} from './services/event.service';
import type { INotificationPayload } from './services/notification.service';
import type { IClickhouseProfile } from './services/profile.service';
declare global {
namespace PrismaJson {
@@ -11,5 +16,8 @@ declare global {
type IPrismaIntegrationConfig = IIntegrationConfig;
type IPrismaNotificationPayload = INotificationPayload;
type IPrismaProjectFilters = IProjectFilters[];
type IPrismaClickhouseEvent = IClickhouseEvent;
type IPrismaClickhouseProfile = IClickhouseProfile;
type IPrismaClickhouseBotEvent = IClickhouseBotEvent;
}
}