chore(root): clean up unused stuff
This commit is contained in:
@@ -49,34 +49,31 @@ export async function postEvent(
|
||||
'NX',
|
||||
);
|
||||
|
||||
// TODO: remove this
|
||||
if (process.env.DISABLE_ADD_JOBS === undefined) {
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'incomingEvent',
|
||||
payload: {
|
||||
projectId,
|
||||
headers: getStringHeaders(request.headers),
|
||||
event: {
|
||||
...request.body,
|
||||
timestamp: timestamp.timestamp,
|
||||
isTimestampFromThePast: timestamp.isTimestampFromThePast,
|
||||
},
|
||||
geo,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
priority: locked === 'OK',
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'incomingEvent',
|
||||
payload: {
|
||||
projectId,
|
||||
headers: getStringHeaders(request.headers),
|
||||
event: {
|
||||
...request.body,
|
||||
timestamp: timestamp.timestamp,
|
||||
isTimestampFromThePast: timestamp.isTimestampFromThePast,
|
||||
},
|
||||
geo,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
priority: locked === 'OK',
|
||||
},
|
||||
{
|
||||
// Prioritize 'screen_view' events by setting no delay
|
||||
// This ensures that session starts are created from 'screen_view' events
|
||||
// rather than other events, maintaining accurate session tracking
|
||||
delay: request.body.name === 'screen_view' ? undefined : 1000,
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
{
|
||||
// Prioritize 'screen_view' events by setting no delay
|
||||
// This ensures that session starts are created from 'screen_view' events
|
||||
// rather than other events, maintaining accurate session tracking
|
||||
delay: request.body.name === 'screen_view' ? undefined : 1000,
|
||||
},
|
||||
);
|
||||
|
||||
reply.status(202).send('ok');
|
||||
}
|
||||
|
||||
@@ -247,34 +247,31 @@ async function track({
|
||||
'NX',
|
||||
);
|
||||
|
||||
// TODO: remove this
|
||||
if (process.env.DISABLE_ADD_JOBS === undefined) {
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'incomingEvent',
|
||||
payload: {
|
||||
projectId,
|
||||
headers,
|
||||
event: {
|
||||
...payload,
|
||||
timestamp,
|
||||
isTimestampFromThePast,
|
||||
},
|
||||
geo,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
priority: locked === 'OK',
|
||||
eventsQueue.add(
|
||||
'event',
|
||||
{
|
||||
type: 'incomingEvent',
|
||||
payload: {
|
||||
projectId,
|
||||
headers,
|
||||
event: {
|
||||
...payload,
|
||||
timestamp,
|
||||
isTimestampFromThePast,
|
||||
},
|
||||
geo,
|
||||
currentDeviceId,
|
||||
previousDeviceId,
|
||||
priority: locked === 'OK',
|
||||
},
|
||||
{
|
||||
// Prioritize 'screen_view' events by setting no delay
|
||||
// This ensures that session starts are created from 'screen_view' events
|
||||
// rather than other events, maintaining accurate session tracking
|
||||
delay: isScreenView ? undefined : 1000,
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
{
|
||||
// Prioritize 'screen_view' events by setting no delay
|
||||
// This ensures that session starts are created from 'screen_view' events
|
||||
// rather than other events, maintaining accurate session tracking
|
||||
delay: isScreenView ? undefined : 1000,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async function identify({
|
||||
|
||||
@@ -76,7 +76,6 @@ export async function createSessionEnd(
|
||||
|
||||
const payload = job.data.payload;
|
||||
|
||||
// TODO: Get complete session from buffer to offload clickhouse
|
||||
const [lastScreenView, eventsInDb] = await Promise.all([
|
||||
eventBuffer.getLastScreenView({
|
||||
projectId: payload.projectId,
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
Warnings:
|
||||
|
||||
- You are about to drop the column `cors` on the `clients` table. All the data in the column will be lost.
|
||||
- You are about to drop the column `crossDomain` on the `clients` table. All the data in the column will be lost.
|
||||
- You are about to drop the `bot_event_buffer` table. If the table is not empty, all the data it contains will be lost.
|
||||
- You are about to drop the `event_buffer` table. If the table is not empty, all the data it contains will be lost.
|
||||
- You are about to drop the `events` table. If the table is not empty, all the data it contains will be lost.
|
||||
- You are about to drop the `profile_buffer` table. If the table is not empty, all the data it contains will be lost.
|
||||
- You are about to drop the `profiles` table. If the table is not empty, all the data it contains will be lost.
|
||||
- You are about to drop the `waitlist` table. If the table is not empty, all the data it contains will be lost.
|
||||
|
||||
*/
|
||||
-- DropForeignKey
|
||||
ALTER TABLE "events" DROP CONSTRAINT "events_projectId_fkey";
|
||||
|
||||
-- DropForeignKey
|
||||
ALTER TABLE "profiles" DROP CONSTRAINT "profiles_projectId_fkey";
|
||||
|
||||
-- AlterTable
|
||||
ALTER TABLE "clients" DROP COLUMN "cors",
|
||||
DROP COLUMN "crossDomain";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "bot_event_buffer";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "event_buffer";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "events";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "profile_buffer";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "profiles";
|
||||
|
||||
-- DropTable
|
||||
DROP TABLE "waitlist";
|
||||
@@ -146,8 +146,6 @@ model Project {
|
||||
/// [IPrismaProjectFilters]
|
||||
filters Json @default("[]")
|
||||
|
||||
events Event[]
|
||||
profiles Profile[]
|
||||
clients Client[]
|
||||
reports Report[]
|
||||
dashboards Dashboard[]
|
||||
@@ -186,21 +184,6 @@ model ProjectAccess {
|
||||
@@map("project_access")
|
||||
}
|
||||
|
||||
model Event {
|
||||
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
|
||||
name String
|
||||
properties Json
|
||||
projectId String
|
||||
project Project @relation(fields: [projectId], references: [id])
|
||||
|
||||
profileId String?
|
||||
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
|
||||
@@map("events")
|
||||
}
|
||||
|
||||
model Salt {
|
||||
salt String @id
|
||||
createdAt DateTime @default(now())
|
||||
@@ -209,22 +192,6 @@ model Salt {
|
||||
@@map("salts")
|
||||
}
|
||||
|
||||
model Profile {
|
||||
id String @id
|
||||
externalId String?
|
||||
firstName String?
|
||||
lastName String?
|
||||
email String?
|
||||
avatar String?
|
||||
properties Json
|
||||
projectId String
|
||||
project Project @relation(fields: [projectId], references: [id])
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
|
||||
@@map("profiles")
|
||||
}
|
||||
|
||||
enum ClientType {
|
||||
read
|
||||
write
|
||||
@@ -240,8 +207,6 @@ model Client {
|
||||
project Project? @relation(fields: [projectId], references: [id])
|
||||
organization Organization @relation(fields: [organizationId], references: [id])
|
||||
organizationId String
|
||||
cors String?
|
||||
crossDomain Boolean @default(false)
|
||||
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
@@ -319,16 +284,6 @@ model Report {
|
||||
@@map("reports")
|
||||
}
|
||||
|
||||
model Waitlist {
|
||||
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
|
||||
email String @unique
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
accepted Boolean @default(false)
|
||||
|
||||
@@map("waitlist")
|
||||
}
|
||||
|
||||
model ShareOverview {
|
||||
id String @unique
|
||||
projectId String @unique
|
||||
@@ -443,52 +398,3 @@ model ResetPassword {
|
||||
|
||||
@@map("reset_password")
|
||||
}
|
||||
|
||||
model EventBuffer {
|
||||
id String @id @default(cuid())
|
||||
projectId String
|
||||
eventId String @unique
|
||||
name String
|
||||
profileId String?
|
||||
sessionId String?
|
||||
/// [IPrismaClickhouseEvent]
|
||||
payload Json
|
||||
processedAt DateTime?
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
|
||||
@@index([projectId, processedAt, createdAt])
|
||||
@@index([projectId, profileId, sessionId, createdAt])
|
||||
@@map("event_buffer")
|
||||
}
|
||||
|
||||
model ProfileBuffer {
|
||||
id String @id @default(cuid())
|
||||
projectId String
|
||||
profileId String
|
||||
checksum String
|
||||
/// [IPrismaClickhouseProfile]
|
||||
payload Json
|
||||
processedAt DateTime?
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
|
||||
@@index([projectId, profileId])
|
||||
@@index([projectId, processedAt])
|
||||
@@index([checksum])
|
||||
@@map("profile_buffer")
|
||||
}
|
||||
|
||||
model BotEventBuffer {
|
||||
id String @id @default(cuid())
|
||||
projectId String
|
||||
eventId String
|
||||
/// [IPrismaClickhouseBotEvent]
|
||||
payload Json
|
||||
createdAt DateTime @default(now())
|
||||
processedAt DateTime?
|
||||
|
||||
@@index([processedAt])
|
||||
@@index([projectId, eventId])
|
||||
@@map("bot_event_buffer")
|
||||
}
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
import { runEvery } from '@openpanel/redis';
|
||||
|
||||
import { TABLE_NAMES, ch } from '../clickhouse-client';
|
||||
import { db } from '../prisma-client';
|
||||
import type { IClickhouseBotEvent } from '../services/event.service';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export class BotBuffer extends BaseBuffer {
|
||||
private batchSize = process.env.BOT_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.BOT_BUFFER_BATCH_SIZE, 10)
|
||||
: 1000;
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'bot',
|
||||
onFlush: async () => {
|
||||
await this.processBuffer();
|
||||
await this.tryCleanup();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async add(event: IClickhouseBotEvent) {
|
||||
try {
|
||||
await db.botEventBuffer.create({
|
||||
data: {
|
||||
projectId: event.project_id,
|
||||
eventId: event.id,
|
||||
payload: event,
|
||||
},
|
||||
});
|
||||
|
||||
// Check if we have enough unprocessed events to trigger a flush
|
||||
const unprocessedCount = await db.botEventBuffer.count({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
});
|
||||
|
||||
if (unprocessedCount >= this.batchSize && !process.env.TEST_NEW_BUFFER) {
|
||||
await this.tryFlush();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add bot event', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async processBuffer() {
|
||||
const eventsToProcess = await db.botEventBuffer.findMany({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: 'asc',
|
||||
},
|
||||
take: this.batchSize,
|
||||
});
|
||||
|
||||
if (eventsToProcess.length > 0) {
|
||||
const toInsert = eventsToProcess.map((e) => e.payload);
|
||||
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.events_bots,
|
||||
values: toInsert,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
|
||||
await db.botEventBuffer.updateMany({
|
||||
where: {
|
||||
id: {
|
||||
in: eventsToProcess.map((e) => e.id),
|
||||
},
|
||||
},
|
||||
data: {
|
||||
processedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.info('Processed bot events', {
|
||||
count: toInsert.length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async tryCleanup() {
|
||||
try {
|
||||
await runEvery({
|
||||
interval: 60 * 5, // 5 minutes
|
||||
fn: this.cleanup.bind(this),
|
||||
key: `${this.name}-cleanup`,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to run cleanup', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async cleanup() {
|
||||
const deleted = await db.botEventBuffer.deleteMany({
|
||||
where: {
|
||||
processedAt: {
|
||||
not: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.info('Cleaned up old bot events', { deleted: deleted.count });
|
||||
}
|
||||
|
||||
public async getBufferSize() {
|
||||
return db.botEventBuffer.count({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,307 +0,0 @@
|
||||
import { getSafeJson, setSuperJson } from '@openpanel/common';
|
||||
import { getRedisCache, getRedisPub, runEvery } from '@openpanel/redis';
|
||||
import { Prisma } from '@prisma/client';
|
||||
import { ch } from '../clickhouse-client';
|
||||
import { type EventBuffer as IPrismaEventBuffer, db } from '../prisma-client';
|
||||
import {
|
||||
type IClickhouseEvent,
|
||||
type IServiceEvent,
|
||||
transformEvent,
|
||||
} from '../services/event.service';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export class EventBuffer extends BaseBuffer {
|
||||
private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_DAYS_TO_KEEP, 10)
|
||||
: 3;
|
||||
private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10)
|
||||
: 2000;
|
||||
private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE
|
||||
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
|
||||
: 1000;
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'event',
|
||||
onFlush: async () => {
|
||||
await this.processBuffer();
|
||||
await this.tryCleanup();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async add(event: IClickhouseEvent) {
|
||||
try {
|
||||
await db.eventBuffer.create({
|
||||
data: {
|
||||
projectId: event.project_id,
|
||||
eventId: event.id,
|
||||
name: event.name,
|
||||
profileId: event.profile_id || null,
|
||||
sessionId: event.session_id || null,
|
||||
payload: event,
|
||||
},
|
||||
});
|
||||
|
||||
if (event.name === 'screen_view') {
|
||||
await getRedisCache().set(
|
||||
this.getLastEventKey({
|
||||
projectId: event.project_id,
|
||||
profileId: event.profile_id,
|
||||
}),
|
||||
JSON.stringify(event),
|
||||
'EX',
|
||||
60 * 31,
|
||||
);
|
||||
}
|
||||
|
||||
if (!process.env.TEST_NEW_BUFFER) {
|
||||
this.publishEvent('event:received', event);
|
||||
if (event.profile_id) {
|
||||
getRedisCache().set(
|
||||
`live:event:${event.project_id}:${event.profile_id}`,
|
||||
'',
|
||||
'EX',
|
||||
60 * 5,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
if (error.code === 'P2002') {
|
||||
this.logger.warn('Duplicate event ignored', { eventId: event.id });
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.logger.error('Failed to add event', { error });
|
||||
}
|
||||
}
|
||||
|
||||
private async publishEvent(channel: string, event: IClickhouseEvent) {
|
||||
try {
|
||||
await getRedisPub().publish(
|
||||
channel,
|
||||
setSuperJson(
|
||||
transformEvent(event) as unknown as Record<string, unknown>,
|
||||
),
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to publish event', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async processBuffer() {
|
||||
let now = performance.now();
|
||||
const timer: Record<string, number | undefined> = {
|
||||
fetchUnprocessedEvents: undefined,
|
||||
transformEvents: undefined,
|
||||
insertToClickhouse: undefined,
|
||||
markAsProcessed: undefined,
|
||||
};
|
||||
const eventsToProcess = await db.$queryRaw<IPrismaEventBuffer[]>`
|
||||
WITH has_more_than_2_events AS (
|
||||
SELECT "sessionId"
|
||||
FROM event_buffer
|
||||
WHERE "processedAt" IS NULL
|
||||
GROUP BY "sessionId"
|
||||
HAVING COUNT(*) >= 2
|
||||
)
|
||||
SELECT *
|
||||
FROM event_buffer e
|
||||
WHERE e."processedAt" IS NULL
|
||||
AND (
|
||||
-- 1) all events except screen_view
|
||||
e.name != 'screen_view'
|
||||
OR
|
||||
-- 2) if the session has >= 2 such unprocessed events
|
||||
e."sessionId" IN (SELECT "sessionId" FROM has_more_than_2_events)
|
||||
)
|
||||
ORDER BY e."createdAt" ASC
|
||||
LIMIT ${this.batchSize}
|
||||
`;
|
||||
|
||||
timer.fetchUnprocessedEvents = performance.now() - now;
|
||||
now = performance.now();
|
||||
|
||||
const toInsert = eventsToProcess.reduce<IPrismaEventBuffer[]>(
|
||||
(acc, event, index, list) => {
|
||||
// SCREEN VIEW
|
||||
if (event.name === 'screen_view') {
|
||||
const nextScreenView = list
|
||||
.slice(index + 1)
|
||||
.find(
|
||||
(e) =>
|
||||
(e.name === 'screen_view' || e.name === 'session_end') &&
|
||||
e.sessionId === event.sessionId,
|
||||
);
|
||||
|
||||
// Calculate duration
|
||||
if (nextScreenView && nextScreenView.name === 'screen_view') {
|
||||
event.payload.duration =
|
||||
new Date(nextScreenView.createdAt).getTime() -
|
||||
new Date(event.createdAt).getTime();
|
||||
}
|
||||
|
||||
// if there is no more screen views nor session_end,
|
||||
// we don't want to insert this event into clickhouse
|
||||
if (!nextScreenView) {
|
||||
return acc;
|
||||
}
|
||||
} else {
|
||||
// OTHER EVENTS
|
||||
const currentScreenView = list
|
||||
.slice(0, index)
|
||||
.findLast(
|
||||
(e) =>
|
||||
e.name === 'screen_view' && e.sessionId === event.sessionId,
|
||||
);
|
||||
|
||||
if (currentScreenView) {
|
||||
// Get path related info from the current screen view
|
||||
event.payload.path = currentScreenView.payload.path;
|
||||
event.payload.origin = currentScreenView.payload.origin;
|
||||
}
|
||||
}
|
||||
|
||||
acc.push(event);
|
||||
|
||||
return acc;
|
||||
},
|
||||
[],
|
||||
);
|
||||
|
||||
timer.transformEvents = performance.now() - now;
|
||||
now = performance.now();
|
||||
|
||||
if (toInsert.length > 0) {
|
||||
const events = toInsert.map((e) => e.payload);
|
||||
for (const chunk of this.chunks(events, this.chunkSize)) {
|
||||
await ch.insert({
|
||||
table: 'events',
|
||||
values: chunk,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
}
|
||||
|
||||
timer.insertToClickhouse = performance.now() - now;
|
||||
now = performance.now();
|
||||
|
||||
for (const event of toInsert) {
|
||||
this.publishEvent('event:saved', event.payload);
|
||||
}
|
||||
|
||||
timer.markAsProcessed = performance.now() - now;
|
||||
now = performance.now();
|
||||
|
||||
await db.eventBuffer.updateMany({
|
||||
where: {
|
||||
id: {
|
||||
in: toInsert.map((e) => e.id),
|
||||
},
|
||||
},
|
||||
data: {
|
||||
processedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
timer.markAsProcessed = performance.now() - now;
|
||||
|
||||
this.logger.info('Processed events', {
|
||||
inserted: toInsert.length,
|
||||
processed: eventsToProcess.length,
|
||||
timer,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async tryCleanup() {
|
||||
try {
|
||||
await runEvery({
|
||||
interval: 60 * 5, // 5 minutes
|
||||
fn: this.cleanup.bind(this),
|
||||
key: `${this.name}-cleanup`,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to run cleanup', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async cleanup() {
|
||||
const olderThan = new Date();
|
||||
olderThan.setDate(olderThan.getDate() - this.daysToKeep);
|
||||
|
||||
const deleted = await db.$executeRaw`
|
||||
DELETE FROM event_buffer
|
||||
WHERE
|
||||
-- 1) if the event has been processed
|
||||
-- and session is completed or has no session
|
||||
(
|
||||
"processedAt" IS NOT NULL
|
||||
AND (
|
||||
"sessionId" IN (SELECT "sessionId" FROM event_buffer WHERE name = 'session_end')
|
||||
OR "sessionId" IS NULL
|
||||
)
|
||||
)
|
||||
-- 2) if the event is stalled for X days
|
||||
OR "createdAt" < ${olderThan}
|
||||
`;
|
||||
|
||||
this.logger.info('Cleaned up old events', { deleted });
|
||||
}
|
||||
|
||||
public async getLastScreenView({
|
||||
projectId,
|
||||
profileId,
|
||||
}: {
|
||||
projectId: string;
|
||||
profileId: string;
|
||||
}): Promise<IServiceEvent | null> {
|
||||
// const event = await db.$primary().eventBuffer.findFirst({
|
||||
// where: {
|
||||
// projectId,
|
||||
// profileId,
|
||||
// name: 'screen_view',
|
||||
// },
|
||||
// orderBy: { createdAt: 'desc' },
|
||||
// select: {
|
||||
// payload: true,
|
||||
// },
|
||||
// });
|
||||
|
||||
// if (event) {
|
||||
// return transformEvent(event.payload);
|
||||
// }
|
||||
|
||||
// return null;
|
||||
const event = await getRedisCache().get(
|
||||
this.getLastEventKey({ projectId, profileId }),
|
||||
);
|
||||
|
||||
if (event) {
|
||||
const parsed = getSafeJson<IClickhouseEvent>(event);
|
||||
if (parsed) {
|
||||
return transformEvent(parsed);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
getLastEventKey({
|
||||
projectId,
|
||||
profileId,
|
||||
}: {
|
||||
projectId: string;
|
||||
profileId: string;
|
||||
}) {
|
||||
return `session:last_screen_view:${projectId}:${profileId}`;
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
return db.eventBuffer.count({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,6 @@ import { EventBuffer as EventBufferRedis } from './event-buffer-redis';
|
||||
import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql';
|
||||
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis';
|
||||
|
||||
export const eventBuffer = process.env.USE_NEW_BUFFER
|
||||
? new EventBufferRedis()
|
||||
: new EventBufferPsql();
|
||||
export const profileBuffer = process.env.USE_NEW_BUFFER
|
||||
? new ProfileBufferRedis()
|
||||
: new ProfileBufferPsql();
|
||||
export const botBuffer = process.env.USE_NEW_BUFFER
|
||||
? new BotBufferRedis()
|
||||
: new BotBufferPsql();
|
||||
export const eventBuffer = new EventBufferRedis();
|
||||
export const profileBuffer = new ProfileBufferRedis();
|
||||
export const botBuffer = new BotBufferRedis();
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
import { createHash } from 'node:crypto';
|
||||
import { runEvery } from '@openpanel/redis';
|
||||
import { assocPath, dissocPath, mergeDeepRight } from 'ramda';
|
||||
|
||||
import { TABLE_NAMES, ch, chQuery } from '../clickhouse-client';
|
||||
import { db } from '../prisma-client';
|
||||
import type { IClickhouseProfile } from '../services/profile.service';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export class ProfileBuffer extends BaseBuffer {
|
||||
private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP
|
||||
? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10)
|
||||
: 7;
|
||||
private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10)
|
||||
: 2000;
|
||||
private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE
|
||||
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
|
||||
: 1000;
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'profile',
|
||||
onFlush: async () => {
|
||||
await this.processBuffer();
|
||||
await this.tryCleanup();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private sortObjectKeys(obj: any, exclude: string[][] = []): any {
|
||||
// Cache typeof check result
|
||||
const type = typeof obj;
|
||||
|
||||
// Fast-path for primitives
|
||||
if (obj === null || type !== 'object') {
|
||||
return String(obj);
|
||||
}
|
||||
|
||||
// Fast-path for arrays - process values only
|
||||
if (Array.isArray(obj)) {
|
||||
// Only map if contains objects
|
||||
return obj.some((item) => item && typeof item === 'object')
|
||||
? obj.map((item) => this.sortObjectKeys(item))
|
||||
: obj;
|
||||
}
|
||||
|
||||
// Get and sort keys once
|
||||
const sortedKeys = Object.keys(obj).sort();
|
||||
const len = sortedKeys.length;
|
||||
|
||||
// Pre-allocate result object
|
||||
const result: any = {};
|
||||
|
||||
// Single loop with cached length
|
||||
for (let i = 0; i < len; i++) {
|
||||
const key = sortedKeys[i]!;
|
||||
const value = obj[key];
|
||||
result[key] =
|
||||
value && typeof value === 'object'
|
||||
? this.sortObjectKeys(value)
|
||||
: String(value);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private excludeKeys(
|
||||
profile: IClickhouseProfile,
|
||||
exclude: string[][],
|
||||
): IClickhouseProfile {
|
||||
let filtered = profile;
|
||||
for (const path of exclude) {
|
||||
filtered = dissocPath(path, filtered);
|
||||
}
|
||||
return filtered;
|
||||
}
|
||||
|
||||
private stringify(profile: IClickhouseProfile): string {
|
||||
const exclude = [
|
||||
['created_at'],
|
||||
['properties', 'brand'],
|
||||
['properties', 'browser_version'],
|
||||
['properties', 'browserVersion'],
|
||||
['properties', 'browser'],
|
||||
['properties', 'city'],
|
||||
['properties', 'country'],
|
||||
['properties', 'device'],
|
||||
['properties', 'latitude'],
|
||||
['properties', 'longitude'],
|
||||
['properties', 'model'],
|
||||
['properties', 'os_version'],
|
||||
['properties', 'osVersion'],
|
||||
['properties', 'os'],
|
||||
['properties', 'path'],
|
||||
['properties', 'referrer_name'],
|
||||
['properties', 'referrerName'],
|
||||
['properties', 'referrer_type'],
|
||||
['properties', 'referrerType'],
|
||||
['properties', 'referrer'],
|
||||
['properties', 'region'],
|
||||
];
|
||||
const excluded = this.excludeKeys(profile, exclude);
|
||||
const sorted = this.sortObjectKeys(excluded);
|
||||
return JSON.stringify(sorted);
|
||||
}
|
||||
|
||||
private generateChecksum(profile: IClickhouseProfile): string {
|
||||
const json = this.stringify(profile);
|
||||
return createHash('sha256').update(json).digest('hex');
|
||||
}
|
||||
|
||||
async add(profile: IClickhouseProfile) {
|
||||
try {
|
||||
const checksum = this.generateChecksum(profile);
|
||||
// Check if we have this exact profile in buffer
|
||||
const existingProfile = await db.profileBuffer.findFirst({
|
||||
where: {
|
||||
projectId: profile.project_id,
|
||||
profileId: profile.id,
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: 'desc',
|
||||
},
|
||||
select: {
|
||||
checksum: true,
|
||||
payload: true,
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Last item in buffer is the same as the new profile
|
||||
if (existingProfile?.checksum === checksum) {
|
||||
this.logger.debug('Duplicate profile ignored', {
|
||||
profileId: profile.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let mergedProfile = profile;
|
||||
|
||||
if (!existingProfile) {
|
||||
this.logger.debug('No profile in buffer, checking Clickhouse', {
|
||||
profileId: profile.id,
|
||||
});
|
||||
// If not in buffer, check Clickhouse
|
||||
const clickhouseProfile = await this.fetchFromClickhouse(profile);
|
||||
if (clickhouseProfile) {
|
||||
this.logger.debug('Clickhouse profile found, merging', {
|
||||
profileId: profile.id,
|
||||
});
|
||||
mergedProfile = mergeDeepRight(clickhouseProfile, profile);
|
||||
}
|
||||
} else if (existingProfile.payload) {
|
||||
this.logger.debug('Profile in buffer is different, merging', {
|
||||
profileId: profile.id,
|
||||
existingProfile: existingProfile.payload,
|
||||
existingProfileChecksum: existingProfile.checksum,
|
||||
incomingProfile: profile,
|
||||
incomingProfileChecksum: checksum,
|
||||
});
|
||||
mergedProfile = mergeDeepRight(existingProfile.payload, profile);
|
||||
}
|
||||
|
||||
// Update existing profile if its not processed yet
|
||||
if (existingProfile) {
|
||||
await db.profileBuffer.update({
|
||||
where: {
|
||||
id: existingProfile.id,
|
||||
},
|
||||
data: {
|
||||
checksum: this.generateChecksum(mergedProfile),
|
||||
payload: mergedProfile,
|
||||
updatedAt: new Date(),
|
||||
processedAt: null,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
// Create new profile
|
||||
await db.profileBuffer.create({
|
||||
data: {
|
||||
projectId: profile.project_id,
|
||||
profileId: profile.id,
|
||||
checksum,
|
||||
payload: mergedProfile,
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add profile', { error });
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchFromClickhouse(
|
||||
profile: IClickhouseProfile,
|
||||
): Promise<IClickhouseProfile | null> {
|
||||
const result = await chQuery<IClickhouseProfile>(
|
||||
`SELECT *
|
||||
FROM ${TABLE_NAMES.profiles}
|
||||
WHERE project_id = '${profile.project_id}'
|
||||
AND id = '${profile.id}'
|
||||
${
|
||||
// If profile is not external, we know its not older than 2 day
|
||||
profile.is_external === false
|
||||
? 'AND created_at > now() - INTERVAL 2 DAY'
|
||||
: ''
|
||||
}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1`,
|
||||
);
|
||||
|
||||
return result[0] || null;
|
||||
}
|
||||
|
||||
async processBuffer() {
|
||||
const profilesToProcess = await db.profileBuffer.findMany({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: 'asc',
|
||||
},
|
||||
take: this.batchSize,
|
||||
});
|
||||
|
||||
if (profilesToProcess.length > 0) {
|
||||
const toInsert = profilesToProcess.map((p) => {
|
||||
const profile = p.payload;
|
||||
return profile;
|
||||
});
|
||||
|
||||
for (const chunk of this.chunks(profilesToProcess, this.chunkSize)) {
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.profiles,
|
||||
values: chunk,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
}
|
||||
|
||||
await db.profileBuffer.updateMany({
|
||||
where: {
|
||||
id: {
|
||||
in: profilesToProcess.map((p) => p.id),
|
||||
},
|
||||
},
|
||||
data: {
|
||||
processedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.info('Processed profiles', {
|
||||
count: toInsert.length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async tryCleanup() {
|
||||
try {
|
||||
await runEvery({
|
||||
interval: 60 * 60, // 1 hour
|
||||
fn: this.cleanup.bind(this),
|
||||
key: `${this.name}-cleanup`,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to run cleanup', { error });
|
||||
}
|
||||
}
|
||||
|
||||
async cleanup() {
|
||||
const olderThan = new Date();
|
||||
olderThan.setDate(olderThan.getDate() - this.daysToKeep);
|
||||
|
||||
const deleted = await db.profileBuffer.deleteMany({
|
||||
where: {
|
||||
processedAt: {
|
||||
lt: olderThan,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.info('Cleaned up old profiles', { deleted: deleted.count });
|
||||
}
|
||||
|
||||
async getBufferSize() {
|
||||
return db.profileBuffer.count({
|
||||
where: {
|
||||
processedAt: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user