test(buffer): testing new buffer (only inserts, no processing)

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-01-31 00:09:25 +00:00
parent 71bf22af51
commit a2b74a9b4d
5 changed files with 95 additions and 104 deletions

View File

@@ -1,13 +1,23 @@
import { TABLE_NAMES, ch } from '../clickhouse-client';
import type { IClickhouseBotEvent } from '../services/event.service';
import { BotBuffer as NewBotBuffer } from './bot-buffer-psql';
import { RedisBuffer } from './buffer';
const testNewBotBuffer = new NewBotBuffer();
type BufferType = IClickhouseBotEvent;
export class BotBuffer extends RedisBuffer<BufferType> {
constructor() {
super('events_bots', 500);
}
async add(event: BufferType) {
await super.add(event);
if (process.env.TEST_NEW_BUFFER) {
await testNewBotBuffer.add(event);
}
}
protected async insertIntoDB(items: BufferType[]): Promise<void> {
await ch.insert({
table: TABLE_NAMES.events_bots,

View File

@@ -4,7 +4,7 @@ import { type ILogger as Logger, createLogger } from '@openpanel/logger';
import { getRedisCache, getRedisPub, runEvery } from '@openpanel/redis';
import { Prisma } from '@prisma/client';
import { ch } from '../clickhouse-client';
import { db } from '../prisma-client';
import { type EventBuffer as IPrismaEventBuffer, db } from '../prisma-client';
import {
type IClickhouseEvent,
type IServiceEvent,
@@ -17,6 +17,7 @@ export class EventBuffer {
private lockKey = `lock:${this.name}`;
private lockTimeout = 60;
private daysToKeep = 2;
private batchSize = 1000;
constructor() {
this.logger = createLogger({ name: this.name });
@@ -35,16 +36,16 @@ export class EventBuffer {
},
});
this.publishEvent('event:received', event);
if (event.profile_id) {
getRedisCache().set(
`live:event:${event.project_id}:${event.profile_id}`,
'',
'EX',
60 * 5,
);
}
// TODO: UNCOMMENT THIS!!!
// this.publishEvent('event:received', event);
// if (event.profile_id) {
// getRedisCache().set(
// `live:event:${event.project_id}:${event.profile_id}`,
// '',
// 'EX',
// 60 * 5,
// );
// }
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
if (error.code === 'P2002') {
@@ -106,109 +107,78 @@ export class EventBuffer {
}
async processBuffer() {
const eventsToProcess = await db.$transaction(async (trx) => {
// Process all screen_views that have a next event
const processableViews = await trx.$queryRaw<
Array<{
id: string;
payload: IClickhouseEvent;
next_event_time: Date;
}>
>`
WITH NextEvents AS (
SELECT
id,
payload,
LEAD("createdAt") OVER (
PARTITION BY "sessionId"
ORDER BY "createdAt"
) as next_event_time
FROM event_buffer
WHERE "name" = 'screen_view'
AND "processedAt" IS NULL
const eventsToProcess = await db.$queryRaw<IPrismaEventBuffer[]>`
WITH has_2_special AS (
SELECT "sessionId"
FROM event_buffer
WHERE "processedAt" IS NULL
AND name IN ('screen_view', 'session_start', 'session_end')
GROUP BY "sessionId"
HAVING COUNT(*) >= 2
)
SELECT *
FROM event_buffer e
WHERE e."processedAt" IS NULL
AND (
-- 1) if the event name is NOT in the special set
e.name NOT IN ('screen_view', 'session_start', 'session_end')
OR
-- 2) if the event name IS in the special set AND
-- the session has >= 2 such unprocessed events
(
e.name IN ('screen_view', 'session_start', 'session_end')
AND e."sessionId" IN (SELECT "sessionId" FROM has_2_special)
)
SELECT *
FROM NextEvents
WHERE next_event_time IS NOT NULL
`;
)
ORDER BY e."createdAt" ASC -- or e.id, whichever "oldest first" logic you use
LIMIT ${this.batchSize}
`;
// Find screen_views that are last in their session with session_end
const lastViews = await trx.$queryRaw<
Array<{
id: string;
payload: IClickhouseEvent;
}>
>`
WITH LastViews AS (
SELECT e.id, e.payload,
EXISTS (
SELECT 1
FROM event_buffer se
WHERE se."name" = 'session_end'
AND se."sessionId" = e."sessionId"
AND se."createdAt" > e."createdAt"
) as has_session_end
FROM event_buffer e
WHERE e."name" = 'screen_view'
AND e."processedAt" IS NULL
AND NOT EXISTS (
SELECT 1
FROM event_buffer next
WHERE next."sessionId" = e."sessionId"
AND next."name" = 'screen_view'
AND next."createdAt" > e."createdAt"
)
)
SELECT * FROM LastViews
WHERE has_session_end = true
`;
const toInsert = eventsToProcess.reduce<IPrismaEventBuffer[]>(
(acc, event, index, list) => {
if (event.name === 'screen_view') {
const nextScreenView = list.find(
(e, eIndex) =>
(e.name === 'screen_view' || e.name === 'session_end') &&
e.sessionId === event.sessionId &&
eIndex > index,
);
// Get all other events
const regularEvents = await trx.eventBuffer.findMany({
where: {
processedAt: null,
name: { not: 'screen_view' },
},
orderBy: { createdAt: 'asc' },
});
if (nextScreenView && nextScreenView.name === 'screen_view') {
event.payload.duration =
new Date(nextScreenView.createdAt).getTime() -
new Date(event.createdAt).getTime();
}
return {
processableViews,
lastViews,
regularEvents,
};
});
// if there is no more screen views nor session_end,
// we don't want to insert this event into clickhouse
if (!nextScreenView) {
return acc;
}
}
const toInsert = [
...eventsToProcess.processableViews.map((view) => ({
...view.payload,
duration:
new Date(view.next_event_time).getTime() -
new Date(view.payload.created_at).getTime(),
})),
...eventsToProcess.lastViews.map((v) => v.payload),
...eventsToProcess.regularEvents.map((e) => e.payload),
];
acc.push(event);
return acc;
},
[],
);
if (toInsert.length > 0) {
await ch.insert({
table: 'events',
values: toInsert,
values: toInsert.map((e) => e.payload),
format: 'JSONEachRow',
});
for (const event of toInsert) {
this.publishEvent('event:saved', event);
this.publishEvent('event:saved', event.payload);
}
await db.eventBuffer.updateMany({
where: {
id: {
in: [
...eventsToProcess.processableViews.map((v) => v.id),
...eventsToProcess.lastViews.map((v) => v.id),
...eventsToProcess.regularEvents.map((e) => e.id),
],
in: toInsert.map((e) => e.id),
},
},
data: {
@@ -218,10 +188,6 @@ export class EventBuffer {
this.logger.info('Processed events', {
count: toInsert.length,
screenViews:
eventsToProcess.processableViews.length +
eventsToProcess.lastViews.length,
regularEvents: eventsToProcess.regularEvents.length,
});
}
}

View File

@@ -16,9 +16,12 @@ import type {
} from '../services/event.service';
import type { Find, FindMany } from './buffer';
import { RedisBuffer } from './buffer';
import { EventBuffer as NewEventBuffer } from './event-buffer-psql';
const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24;
const testNewEventBuffer = new NewEventBuffer();
type BufferType = IClickhouseEvent;
export class EventBuffer extends RedisBuffer<BufferType> {
constructor() {
@@ -57,6 +60,9 @@ export class EventBuffer extends RedisBuffer<BufferType> {
public async add(event: BufferType) {
await super.add(event);
if (process.env.TEST_NEW_BUFFER) {
await testNewEventBuffer.add(event);
}
if (event.name === 'screen_view') {
await getRedisCache().set(
this.getLastEventKey({

View File

@@ -1,6 +1,6 @@
import { BotBuffer } from './bot-buffer-psql';
import { EventBuffer } from './event-buffer-psql';
import { ProfileBuffer } from './profile-buffer-psql';
import { BotBuffer } from './bot-buffer';
import { EventBuffer } from './event-buffer';
import { ProfileBuffer } from './profile-buffer';
export const eventBuffer = new EventBuffer();
export const profileBuffer = new ProfileBuffer();

View File

@@ -17,17 +17,26 @@ import type {
} from '../services/profile.service';
import type { Find, FindMany } from './buffer';
import { RedisBuffer } from './buffer';
import { ProfileBuffer as NewProfileBuffer } from './profile-buffer-psql';
const BATCH_SIZE = process.env.BATCH_SIZE_PROFILES
? Number.parseInt(process.env.BATCH_SIZE_PROFILES, 10)
: 50;
const testNewProfileBuffer = new NewProfileBuffer();
type BufferType = IClickhouseProfile;
export class ProfileBuffer extends RedisBuffer<BufferType> {
constructor() {
super('profiles', BATCH_SIZE);
}
async add(profile: BufferType) {
await super.add(profile);
if (process.env.TEST_NEW_BUFFER) {
await testNewProfileBuffer.add(profile);
}
}
// this will do a couple of things:
// - we slice the queue to maxBufferSize since this queries have a limit on character count
// - check redis cache for profiles