From 5e225b7ae6f1d6669f5ae7fadc8cd8b279be3570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 17 Jul 2024 17:13:07 +0200 Subject: [PATCH] batching events --- apps/api/Dockerfile | 38 +-- apps/api/package.json | 1 + apps/api/scripts/get-organizations.ts | 189 +++++++++++ apps/api/scripts/migrate-origins.ts | 65 ++++ apps/api/src/controllers/event.controller.ts | 34 +- apps/api/src/controllers/live.controller.ts | 42 ++- .../api/src/controllers/profile.controller.ts | 4 +- .../api/src/controllers/webhook.controller.ts | 152 +++++++++ apps/api/src/index.ts | 68 ++-- apps/api/src/routes/live.router.ts | 7 +- apps/api/src/routes/webhook.router.ts | 13 + apps/api/src/utils/auth.ts | 2 - apps/api/src/utils/parseUserAgent.ts | 87 ++++- apps/dashboard/Dockerfile | 49 +-- apps/dashboard/entrypoint.sh | 32 ++ apps/dashboard/next.config.mjs | 2 +- apps/dashboard/package.json | 6 +- .../events/event-list/event-listener.tsx | 19 +- .../realtime/realtime-live-events/index.tsx | 2 +- .../realtime-live-events/live-events.tsx | 4 +- .../realtime/realtime-reloader.tsx | 21 +- .../src/app/api/clerk/webhook/route.ts | 2 + apps/dashboard/src/app/providers.tsx | 3 +- apps/dashboard/src/hooks/useDebounceVal.ts | 30 ++ apps/dashboard/src/hooks/useWS.ts | 22 +- apps/public/Dockerfile | 33 +- apps/worker/Dockerfile | 24 +- apps/worker/package.json | 7 +- apps/worker/scripts/debug.ts | 139 ++++++++ apps/worker/src/index.ts | 121 ++++++- apps/worker/src/jobs/cron.salt.ts | 4 +- apps/worker/src/jobs/cron.ts | 7 + .../src/jobs/events.create-session-end.ts | 17 +- apps/worker/src/jobs/events.incoming-event.ts | 314 ++++++++---------- apps/worker/src/jobs/events.ts | 18 +- apps/worker/src/jobs/sessions.ts | 9 + apps/worker/src/utils/parse-user-agent.ts | 87 ++++- package.json | 9 +- packages/common/src/object.ts | 50 ++- packages/db/clickhouse_init.sql | 73 ++++ packages/db/index.ts | 1 + packages/db/package.json | 2 +- packages/db/src/buffers/buffer.ts | 137 ++++++++ packages/db/src/buffers/event-buffer.ts | 212 ++++++++++++ packages/db/src/buffers/index.ts | 5 + packages/db/src/buffers/profile-buffer.ts | 114 +++++++ packages/db/src/clickhouse-client.ts | 49 ++- packages/db/src/services/event.service.ts | 57 ++-- packages/db/src/services/profile.service.ts | 46 +-- packages/logger/index.ts | 2 +- packages/queue/package.json | 4 +- packages/queue/src/queues.ts | 35 +- packages/queue/src/utils.ts | 2 +- packages/redis/cachable.ts | 17 +- packages/redis/package.json | 4 +- packages/redis/redis.ts | 5 +- patches/@bull-board__api@5.21.0.patch | 40 +++ pnpm-lock.yaml | 249 ++++++++++---- 58 files changed, 2204 insertions(+), 583 deletions(-) create mode 100644 apps/api/scripts/get-organizations.ts create mode 100644 apps/api/scripts/migrate-origins.ts create mode 100644 apps/api/src/controllers/webhook.controller.ts create mode 100644 apps/api/src/routes/webhook.router.ts create mode 100644 apps/dashboard/entrypoint.sh create mode 100644 apps/dashboard/src/hooks/useDebounceVal.ts create mode 100644 apps/worker/scripts/debug.ts create mode 100644 apps/worker/src/jobs/sessions.ts create mode 100644 packages/db/clickhouse_init.sql create mode 100644 packages/db/src/buffers/buffer.ts create mode 100644 packages/db/src/buffers/event-buffer.ts create mode 100644 packages/db/src/buffers/index.ts create mode 100644 packages/db/src/buffers/profile-buffer.ts create mode 100644 patches/@bull-board__api@5.21.0.patch diff --git a/apps/api/Dockerfile b/apps/api/Dockerfile index 9b95438a..bcc86eca 100644 --- a/apps/api/Dockerfile +++ b/apps/api/Dockerfile @@ -1,48 +1,16 @@ # Dockerfile that builds the web app only - -FROM --platform=linux/amd64 node:20-slim AS base +ARG NODE_VERSION=20 +FROM --platform=linux/amd64 node:${NODE_VERSION}-slim AS base ARG DATABASE_URL ENV DATABASE_URL=$DATABASE_URL -ARG CLICKHOUSE_DB -ENV CLICKHOUSE_DB=$CLICKHOUSE_DB - -ARG CLICKHOUSE_PASSWORD -ENV CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD - -ARG CLICKHOUSE_URL -ENV CLICKHOUSE_URL=$CLICKHOUSE_URL - -ARG CLICKHOUSE_USER -ENV CLICKHOUSE_USER=$CLICKHOUSE_USER - -ARG REDIS_URL -ENV REDIS_URL=$REDIS_URL - -ARG NEXT_PUBLIC_DASHBOARD_URL -ENV NEXT_PUBLIC_DASHBOARD_URL=$NEXT_PUBLIC_DASHBOARD_URL - -ARG NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY -ENV NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY=$NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY - -ARG CLERK_SECRET_KEY -ENV CLERK_SECRET_KEY=$CLERK_SECRET_KEY - -ARG CLERK_PUBLIC_PEM_KEY -ENV CLERK_PUBLIC_PEM_KEY=$CLERK_PUBLIC_PEM_KEY - -ARG SEVENTY_SEVEN_API_KEY -ENV SEVENTY_SEVEN_API_KEY=$SEVENTY_SEVEN_API_KEY - ENV PNPM_HOME="/pnpm" ENV PATH="$PNPM_HOME:$PATH" RUN corepack enable -ARG NODE_VERSION=20 - RUN apt update \ && apt install -y curl python3 make g++ \ && curl -L https://raw.githubusercontent.com/tj/n/master/bin/n -o n \ @@ -73,7 +41,7 @@ WORKDIR /app/apps/api RUN pnpm install --frozen-lockfile WORKDIR /app -COPY apps apps +COPY apps/api apps/api COPY packages packages COPY tooling tooling RUN pnpm db:codegen diff --git a/apps/api/package.json b/apps/api/package.json index c8612c7b..7f5b1e59 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -35,6 +35,7 @@ "sharp": "^0.33.2", "sqlstring": "^2.3.3", "superjson": "^1.13.3", + "svix": "^1.24.0", "ua-parser-js": "^1.0.37", "url-metadata": "^4.1.0", "zod": "^3.22.4" diff --git a/apps/api/scripts/get-organizations.ts b/apps/api/scripts/get-organizations.ts new file mode 100644 index 00000000..427c4a4e --- /dev/null +++ b/apps/api/scripts/get-organizations.ts @@ -0,0 +1,189 @@ +// import { clerkClient } from '@clerk/fastify'; + +import { db } from '@openpanel/db'; + +// import { db } from '@openpanel/db'; + +// type Fn = (args: { limit: number; offset: number }) => Promise<{ +// data: T[]; +// totalCount: number; +// }>; + +// function getAllDataByPagination( +// cb: T +// ): Promise>['data']> { +// const data: Awaited>['data'] = []; +// async function getData(page = 0) { +// console.log(`getData with offset ${page * 100}`); +// const response = await cb({ +// limit: 100, +// offset: page * 100, +// }); +// if (response.data.length !== 0) { +// data.push(...response.data); +// await getData(page + 1); +// } +// await new Promise((resolve) => setTimeout(resolve, 100)); +// } + +// return getData().then(() => data); +// } + +// async function main() { +// const organizations = await getAllDataByPagination( +// clerkClient.organizations.getOrganizationList.bind( +// clerkClient.organizations +// ) +// ); +// const users = await getAllDataByPagination( +// clerkClient.users.getUserList.bind(clerkClient.users) +// ); + +// console.log(`Found ${organizations.length} organizations`); +// console.log(`Found ${users.length} users`); + +// for (const user of users.slice(-10)) { +// const email = user.primaryEmailAddress?.emailAddress; +// console.log('Check', email); + +// try { +// if (email) { +// const exists = await db.user.findUnique({ +// where: { +// id: user.id, +// }, +// }); + +// if (exists) { +// console.log('already exists'); +// } else { +// await db.user.create({ +// data: { +// id: user.id, +// email: email, +// firstName: user.firstName, +// lastName: user.lastName, +// }, +// }); +// } +// } else { +// console.log('No email?', user); +// } +// } catch (e) { +// console.log('ERROR'); +// console.log(''); +// console.log(''); +// console.dir(user, { depth: null }); + +// console.log(''); +// console.log(''); +// console.log(''); +// } +// } + +// for (const org of organizations.slice(-20)) { +// try { +// if (org.slug) { +// const exists = await db.organization.findUnique({ +// where: { +// id: org.slug, +// }, +// }); + +// if (exists) { +// console.log('already exists org'); +// } else { +// const clerkOrgMembers = +// await clerkClient.organizations.getOrganizationMembershipList({ +// organizationId: org.id, +// }); + +// const members = clerkOrgMembers.data.map((member) => { +// const user = users.find( +// (u) => u.id === member.publicUserData?.userId +// ); +// return { +// userId: member.publicUserData?.userId, +// role: member.role, +// email: user!.primaryEmailAddress!.emailAddress, +// }; +// }); + +// await db.organization.create({ +// data: { +// id: org.slug, +// name: org.name, +// createdBy: { +// connect: { +// id: org.createdBy, +// }, +// }, +// members: { +// create: members, +// }, +// }, +// }); + +// const invites = +// await clerkClient.organizations.getOrganizationInvitationList({ +// organizationId: org.id, +// status: ['pending'], +// }); + +// for (const invite of invites.data) { +// await db.member.create({ +// data: { +// email: invite.emailAddress, +// organizationId: org.slug, +// role: invite.role, +// userId: null, +// meta: { +// access: invite.publicMetadata?.access as string[], +// invitationId: invite.id, +// }, +// }, +// }); +// } +// } +// } else { +// console.log('org does not have any slug', org); +// } +// } catch (e) { +// console.log('ERROR'); +// console.log(''); +// console.log(''); +// console.dir(org, { depth: null }); +// console.log(''); +// console.log(''); +// console.log(''); +// } +// } + +// process.exit(0); +// } + +// main(); + +async function main() { + const organization = await db.organization.findUnique({ + where: { + id: 'openpanel-dev', + members: { + some: { + userId: 'user_2cEoI8b1SuEFbZERGEAyVvC676F', + }, + }, + }, + include: { + members: { + select: { + role: true, + user: true, + }, + }, + }, + }); + + console.dir(organization, { depth: null }); +} +main(); diff --git a/apps/api/scripts/migrate-origins.ts b/apps/api/scripts/migrate-origins.ts new file mode 100644 index 00000000..7f365e85 --- /dev/null +++ b/apps/api/scripts/migrate-origins.ts @@ -0,0 +1,65 @@ +import { ch, chQuery } from '@openpanel/db'; + +async function main() { + const projects = await chQuery( + `SELECT distinct project_id FROM events ORDER BY project_id` + ); + const withOrigin = []; + + for (const project of projects) { + try { + const [eventWithOrigin, eventWithoutOrigin] = await Promise.all([ + await chQuery( + `SELECT * FROM events WHERE origin != '' AND project_id = '${project.project_id}' ORDER BY created_at DESC LIMIT 1` + ), + await chQuery( + `SELECT * FROM events WHERE origin = '' AND project_id = '${project.project_id}' AND path != '' ORDER BY created_at DESC LIMIT 1` + ), + ]); + + if (eventWithOrigin[0] && eventWithoutOrigin[0]) { + console.log(`Project ${project.project_id} as events without origin`); + console.log(`- Origin: ${eventWithOrigin[0].origin}`); + withOrigin.push(project.project_id); + const events = await chQuery( + `SELECT count(*) as count FROM events WHERE project_id = '${project.project_id}' AND path != '' AND origin = ''` + ); + console.log(`🤠🤠🤠🤠 Will update ${events[0]?.count} events`); + await ch.command({ + query: `ALTER TABLE events UPDATE origin = '${eventWithOrigin[0].origin}' WHERE project_id = '${project.project_id}' AND path != '' AND origin = '';`, + clickhouse_settings: { + wait_end_of_query: 1, + }, + }); + } + + if (!eventWithOrigin[0] && eventWithoutOrigin[0]) { + console.log( + `😧 Project ${project.project_id} has no events with origin (last event ${eventWithoutOrigin[0].created_at})` + ); + console.log('- NO ORIGIN'); + } + + if (!eventWithOrigin[0] && !eventWithoutOrigin[0]) { + console.log( + `🔥 WARNING: Project ${project.project_id} has no events at all?!?!?!` + ); + } + + if (eventWithOrigin[0] && !eventWithoutOrigin[0]) { + console.log( + `✅ Project ${project.project_id} has all events with origin!!!` + ); + } + console.log(''); + console.log(''); + + await new Promise((resolve) => setTimeout(resolve, 500)); + } catch (e) { + console.log('🥵 ERROR ORRROR'); + console.log('Error for project', project.project_id); + } + } + process.exit(0); +} +main(); diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index e473da14..5e0736ff 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -5,6 +5,7 @@ import type { FastifyReply, FastifyRequest } from 'fastify'; import { generateDeviceId } from '@openpanel/common'; import { getSalts } from '@openpanel/db'; import { eventsQueue } from '@openpanel/queue'; +import { redis } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; export async function postEvent( @@ -63,7 +64,6 @@ export async function postEvent( } const ip = getClientIp(request)!; const ua = request.headers['user-agent']!; - const origin = request.headers.origin!; const projectId = request.client?.projectId; if (!projectId) { @@ -84,19 +84,15 @@ export async function postEvent( ip, ua, }); - // TODO: Remove after 2024-09-26 - const currentDeviceIdDeprecated = generateDeviceId({ - salt: salts.current, - origin, - ip, - ua, - }); - const previousDeviceIdDeprecated = generateDeviceId({ - salt: salts.previous, - origin, - ip, - ua, - }); + + // this will ensure that we don't have multiple events creating sessions + const locked = await redis.set( + `request:priority:${currentDeviceId}-${previousDeviceId}`, + 'locked', + 'EX', + 10, + 'NX' + ); eventsQueue.add('event', { type: 'incomingEvent', @@ -105,13 +101,15 @@ export async function postEvent( headers: { ua, }, - event: request.body, + event: { + ...request.body, + // Dont rely on the client for the timestamp + timestamp: new Date().toISOString(), + }, geo, currentDeviceId, previousDeviceId, - // TODO: Remove after 2024-09-26 - currentDeviceIdDeprecated, - previousDeviceIdDeprecated, + priority: locked === 'OK', }, }); diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index d6419c61..a9f8e118 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -19,7 +19,7 @@ export function getLiveEventInfo(key: string) { return key.split(':').slice(2) as [string, string]; } -export async function test( +export async function testVisitors( req: FastifyRequest<{ Params: { projectId: string; @@ -28,13 +28,15 @@ export async function test( reply: FastifyReply ) { const events = await getEvents( - `SELECT * FROM events WHERE project_id = ${escape(req.params.projectId)} AND name = 'screen_view' LIMIT 500` + `SELECT * FROM events LIMIT 500` + // `SELECT * FROM events WHERE name = 'screen_view' LIMIT 500` ); const event = events[Math.floor(Math.random() * events.length)]; if (!event) { return reply.status(404).send('No event found'); } - redisPub.publish('event', superjson.stringify(event)); + event.projectId = req.params.projectId; + redisPub.publish('event:received', superjson.stringify(event)); redis.set( `live:event:${event.projectId}:${Math.random() * 1000}`, '', @@ -44,6 +46,26 @@ export async function test( reply.status(202).send(event); } +export async function testEvents( + req: FastifyRequest<{ + Params: { + projectId: string; + }; + }>, + reply: FastifyReply +) { + const events = await getEvents( + `SELECT * FROM events LIMIT 500` + // `SELECT * FROM events WHERE name = 'screen_view' LIMIT 500` + ); + const event = events[Math.floor(Math.random() * events.length)]; + if (!event) { + return reply.status(404).send('No event found'); + } + redisPub.publish('event:saved', superjson.stringify(event)); + reply.status(202).send(event); +} + export function wsVisitors( connection: { socket: WebSocket; @@ -56,11 +78,11 @@ export function wsVisitors( ) { const { params } = req; - redisSub.subscribe('event'); + redisSub.subscribe('event:received'); redisSub.psubscribe('__key*:expired'); const message = (channel: string, message: string) => { - if (channel === 'event') { + if (channel === 'event:received') { const event = getSuperJson(message); if (event?.projectId === params.projectId) { getLiveVisitors(params.projectId).then((count) => { @@ -82,7 +104,7 @@ export function wsVisitors( redisSub.on('pmessage', pmessage); connection.socket.on('close', () => { - redisSub.unsubscribe('event'); + redisSub.unsubscribe('event:saved'); redisSub.punsubscribe('__key*:expired'); redisSub.off('message', message); redisSub.off('pmessage', pmessage); @@ -90,7 +112,7 @@ export function wsVisitors( } export function wsEvents(connection: { socket: WebSocket }) { - redisSub.subscribe('event'); + redisSub.subscribe('event:saved'); const message = (channel: string, message: string) => { const event = getSuperJson(message); @@ -102,7 +124,7 @@ export function wsEvents(connection: { socket: WebSocket }) { redisSub.on('message', message); connection.socket.on('close', () => { - redisSub.unsubscribe('event'); + redisSub.unsubscribe('event:saved'); redisSub.off('message', message); }); } @@ -129,7 +151,7 @@ export async function wsProjectEvents( projectId: params.projectId, }); - redisSub.subscribe('event'); + redisSub.subscribe('event:saved'); const message = async (channel: string, message: string) => { const event = getSuperJson(message); @@ -151,7 +173,7 @@ export async function wsProjectEvents( redisSub.on('message', message as any); connection.socket.on('close', () => { - redisSub.unsubscribe('event'); + redisSub.unsubscribe('event:saved'); redisSub.off('message', message as any); }); } diff --git a/apps/api/src/controllers/profile.controller.ts b/apps/api/src/controllers/profile.controller.ts index 1971b2a4..dd7eadcb 100644 --- a/apps/api/src/controllers/profile.controller.ts +++ b/apps/api/src/controllers/profile.controller.ts @@ -1,5 +1,5 @@ import { getClientIp, parseIp } from '@/utils/parseIp'; -import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent'; +import { parseUserAgent } from '@/utils/parseUserAgent'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr } from 'ramda'; @@ -29,7 +29,7 @@ export async function updateProfile( properties: { ...(properties ?? {}), ...(ip ? geo : {}), - ...(isUserAgentSet(ua) ? uaInfo : {}), + ...uaInfo, }, ...rest, }); diff --git a/apps/api/src/controllers/webhook.controller.ts b/apps/api/src/controllers/webhook.controller.ts new file mode 100644 index 00000000..941fbf75 --- /dev/null +++ b/apps/api/src/controllers/webhook.controller.ts @@ -0,0 +1,152 @@ +import type { WebhookEvent } from '@clerk/fastify'; +import type { FastifyReply, FastifyRequest } from 'fastify'; +import { pathOr } from 'ramda'; +import { Webhook } from 'svix'; + +import { AccessLevel, db } from '@openpanel/db'; + +if (!process.env.CLERK_SIGNING_SECRET) { + throw new Error('CLERK_SIGNING_SECRET is required'); +} + +const wh = new Webhook(process.env.CLERK_SIGNING_SECRET); + +function verify(body: any, headers: FastifyRequest['headers']) { + try { + const svix_id = headers['svix-id'] as string; + const svix_timestamp = headers['svix-timestamp'] as string; + const svix_signature = headers['svix-signature'] as string; + + wh.verify(JSON.stringify(body), { + 'svix-id': svix_id, + 'svix-timestamp': svix_timestamp, + 'svix-signature': svix_signature, + }); + + return true; + } catch (error) { + return false; + } +} + +export async function clerkWebhook( + request: FastifyRequest<{ + Body: WebhookEvent; + }>, + reply: FastifyReply +) { + const payload = request.body; + const verified = verify(payload, request.headers); + + if (!verified) { + return reply.send({ message: 'Invalid signature' }); + } + + if (payload.type === 'user.created') { + const email = payload.data.email_addresses[0]?.email_address; + + if (!email) { + return Response.json( + { message: 'No email address found' }, + { status: 400 } + ); + } + + const user = await db.user.create({ + data: { + id: payload.data.id, + email, + firstName: payload.data.first_name, + lastName: payload.data.last_name, + }, + }); + + const memberships = await db.member.findMany({ + where: { + email, + userId: null, + }, + }); + + for (const membership of memberships) { + const access = pathOr([], ['meta', 'access'], membership); + db.$transaction([ + // Update the member to link it to the user + // This will remove the item from invitations + db.member.update({ + where: { + id: membership.id, + }, + data: { + userId: user.id, + }, + }), + db.projectAccess.createMany({ + data: access + .filter((a) => typeof a === 'string') + .map((projectId) => ({ + organizationSlug: membership.organizationId, + organizationId: membership.organizationId, + projectId: projectId, + userId: user.id, + level: AccessLevel.read, + })), + }), + ]); + } + } + + if (payload.type === 'organizationMembership.created') { + const access = payload.data.public_metadata.access; + if (Array.isArray(access)) { + await db.projectAccess.createMany({ + data: access + .filter((a): a is string => typeof a === 'string') + .map((projectId) => ({ + organizationSlug: payload.data.organization.slug, + organizationId: payload.data.organization.slug, + projectId: projectId, + userId: payload.data.public_user_data.user_id, + level: AccessLevel.read, + })), + }); + } + } + + if (payload.type === 'user.deleted') { + await db.$transaction([ + db.user.update({ + where: { + id: payload.data.id, + }, + data: { + deletedAt: new Date(), + firstName: null, + lastName: null, + email: `deleted+${payload.data.id}@openpanel.dev`, + }, + }), + db.projectAccess.deleteMany({ + where: { + userId: payload.data.id, + }, + }), + db.member.deleteMany({ + where: { + userId: payload.data.id, + }, + }), + ]); + } + + if (payload.type === 'organizationMembership.deleted') { + await db.projectAccess.deleteMany({ + where: { + organizationSlug: payload.data.organization.slug, + userId: payload.data.public_user_data.user_id, + }, + }); + } + + reply.send({ success: true }); +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 12a042ff..ded242fd 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -19,6 +19,7 @@ import exportRouter from './routes/export.router'; import liveRouter from './routes/live.router'; import miscRouter from './routes/misc.router'; import profileRouter from './routes/profile.router'; +import webhookRouter from './routes/webhook.router'; import { logger, logInfo } from './utils/logger'; declare module 'fastify' { @@ -82,6 +83,7 @@ const startServer = async () => { fastify.register(profileRouter, { prefix: '/profile' }); fastify.register(miscRouter, { prefix: '/misc' }); fastify.register(exportRouter, { prefix: '/export' }); + fastify.register(webhookRouter, { prefix: '/webhook' }); fastify.setErrorHandler((error) => { logger.error(error, 'Error in request'); }); @@ -89,35 +91,45 @@ const startServer = async () => { reply.send({ name: 'openpanel sdk api' }); }); fastify.get('/healthcheck', async (request, reply) => { - try { - const redisRes = await withTimings(redis.keys('*')); - const dbRes = await withTimings(db.project.findFirst()); - const queueRes = await withTimings(eventsQueue.getCompleted()); - const chRes = await withTimings( - chQuery('SELECT * FROM events LIMIT 1') - ); + const redisRes = await withTimings(redis.keys('*')).catch( + (e: Error) => e + ); + const dbRes = await withTimings(db.project.findFirst()).catch( + (e: Error) => e + ); + const queueRes = await withTimings(eventsQueue.getCompleted()).catch( + (e: Error) => e + ); + const chRes = await withTimings( + chQuery('SELECT * FROM events LIMIT 1') + ).catch((e: Error) => e); - reply.send({ - redis: { - ok: redisRes[1].length ? true : false, - time: `${redisRes[0]}ms`, - }, - db: { - ok: dbRes[1] ? true : false, - time: `${dbRes[0]}ms`, - }, - queue: { - ok: queueRes[1].length ? true : false, - time: `${queueRes[0]}ms`, - }, - ch: { - ok: chRes[1].length ? true : false, - time: `${chRes[0]}ms`, - }, - }); - } catch (e) { - reply.status(500).send(); - } + reply.send({ + redis: Array.isArray(redisRes) + ? { + ok: redisRes[1].length ? true : false, + time: `${redisRes[0]}ms`, + } + : redisRes, + db: Array.isArray(dbRes) + ? { + ok: dbRes[1] ? true : false, + time: `${dbRes[0]}ms`, + } + : dbRes, + queue: Array.isArray(queueRes) + ? { + ok: queueRes[1].length ? true : false, + time: `${queueRes[0]}ms`, + } + : queueRes, + ch: Array.isArray(chRes) + ? { + ok: chRes[1].length ? true : false, + time: `${chRes[0]}ms`, + } + : chRes, + }); }); if (process.env.NODE_ENV === 'production') { for (const signal of ['SIGINT', 'SIGTERM']) { diff --git a/apps/api/src/routes/live.router.ts b/apps/api/src/routes/live.router.ts index a9956c5c..763fbb85 100644 --- a/apps/api/src/routes/live.router.ts +++ b/apps/api/src/routes/live.router.ts @@ -3,10 +3,15 @@ import fastifyWS from '@fastify/websocket'; import type { FastifyPluginCallback } from 'fastify'; const liveRouter: FastifyPluginCallback = (fastify, opts, done) => { + fastify.route({ + method: 'GET', + url: '/visitors/test/:projectId', + handler: controller.testVisitors, + }); fastify.route({ method: 'GET', url: '/events/test/:projectId', - handler: controller.test, + handler: controller.testEvents, }); fastify.register(fastifyWS); diff --git a/apps/api/src/routes/webhook.router.ts b/apps/api/src/routes/webhook.router.ts new file mode 100644 index 00000000..2f9901d5 --- /dev/null +++ b/apps/api/src/routes/webhook.router.ts @@ -0,0 +1,13 @@ +import * as controller from '@/controllers/webhook.controller'; +import type { FastifyPluginCallback } from 'fastify'; + +const webhookRouter: FastifyPluginCallback = (fastify, opts, done) => { + fastify.route({ + method: 'POST', + url: '/clerk', + handler: controller.clerkWebhook, + }); + done(); +}; + +export default webhookRouter; diff --git a/apps/api/src/utils/auth.ts b/apps/api/src/utils/auth.ts index 470edf6f..83c2574f 100644 --- a/apps/api/src/utils/auth.ts +++ b/apps/api/src/utils/auth.ts @@ -5,8 +5,6 @@ import { verifyPassword } from '@openpanel/common'; import type { Client, IServiceClient } from '@openpanel/db'; import { ClientType, db } from '@openpanel/db'; -import { logger } from './logger'; - const cleanDomain = (domain: string) => domain .replace('www.', '') diff --git a/apps/api/src/utils/parseUserAgent.ts b/apps/api/src/utils/parseUserAgent.ts index 4a8842a9..c8671e73 100644 --- a/apps/api/src/utils/parseUserAgent.ts +++ b/apps/api/src/utils/parseUserAgent.ts @@ -1,11 +1,16 @@ import { UAParser } from 'ua-parser-js'; -export function isUserAgentSet(ua: string) { - return ua !== 'node' && ua !== 'undici' && !!ua; -} +const parsedServerUa = { + isServer: true, + device: 'server', +} as const; -export function parseUserAgent(ua: string) { +export function parseUserAgent(ua?: string | null) { + if (!ua) return parsedServerUa; const res = new UAParser(ua).getResult(); + + if (isServer(ua)) return parsedServerUa; + return { os: res.os.name, osVersion: res.os.version, @@ -14,20 +19,86 @@ export function parseUserAgent(ua: string) { device: res.device.type ?? getDevice(ua), brand: res.device.vendor, model: res.device.model, - }; + isServer: false, + } as const; +} + +const userAgentServerList: string[] = [ + // Node.js libraries + 'node', + 'node-fetch', + 'axios', + 'request', + 'superagent', + 'undici', + + // Python libraries + 'python-requests', + 'python-urllib', + + // Ruby libraries + 'Faraday', + 'Ruby', + 'http.rb', + + // Go libraries + 'Go-http-client', + 'Go-http-client', + + // Java libraries + 'Apache-HttpClient', + 'okhttp', + 'okhowtp', + + // PHP libraries + 'GuzzleHttp', + 'PHP-cURL', + + // Other + 'Dart', + 'RestSharp', // Popular .NET HTTP client library + 'HttpClientFactory', // .NET's typed client factory + 'Ktor', // A client for Kotlin + 'Ning', // Async HTTP client for Java + 'grpc-csharp', // gRPC for C# + 'Volley', // HTTP library used in Android apps for making network requests + 'Spring', + 'vert.x', + 'grpc-', +]; + +function isServer(userAgent: string) { + const match = userAgentServerList.some((server) => + userAgent.includes(server) + ); + if (match) { + return true; + } + + return !!userAgent.match(/^[^\s]+\/[\d.]+$/); } export function getDevice(ua: string) { - const t1 = + const mobile1 = /(android|bb\d+|meego).+mobile|avantgo|bada\/|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|mobile.+firefox|netfront|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\/|plucker|pocket|psp|series(4|6)0|symbian|treo|up\.(browser|link)|vodafone|wap|windows ce|xda|xiino/i.test( ua ); - const t2 = + const mobile2 = /1207|6310|6590|3gso|4thp|50[1-6]i|770s|802s|a wa|abac|ac(er|oo|s-)|ai(ko|rn)|al(av|ca|co)|amoi|an(ex|ny|yw)|aptu|ar(ch|go)|as(te|us)|attw|au(di|-m|r |s )|avan|be(ck|ll|nq)|bi(lb|rd)|bl(ac|az)|br(e|v)w|bumb|bw-(n|u)|c55\/|capi|ccwa|cdm-|cell|chtm|cldc|cmd-|co(mp|nd)|craw|da(it|ll|ng)|dbte|dc-s|devi|dica|dmob|do(c|p)o|ds(12|-d)|el(49|ai)|em(l2|ul)|er(ic|k0)|esl8|ez([4-7]0|os|wa|ze)|fetc|fly(-|_)|g1 u|g560|gene|gf-5|g-mo|go(\.w|od)|gr(ad|un)|haie|hcit|hd-(m|p|t)|hei-|hi(pt|ta)|hp( i|ip)|hs-c|ht(c(-| |_|a|g|p|s|t)|tp)|hu(aw|tc)|i-(20|go|ma)|i230|iac( |-|\/)|ibro|idea|ig01|ikom|im1k|inno|ipaq|iris|ja(t|v)a|jbro|jemu|jigs|kddi|keji|kgt( |\/)|klon|kpt |kwc-|kyo(c|k)|le(no|xi)|lg( g|\/(k|l|u)|50|54|-[a-w])|libw|lynx|m1-w|m3ga|m50\/|ma(te|ui|xo)|mc(01|21|ca)|m-cr|me(rc|ri)|mi(o8|oa|ts)|mmef|mo(01|02|bi|de|do|t(-| |o|v)|zz)|mt(50|p1|v )|mwbp|mywa|n10[0-2]|n20[2-3]|n30(0|2)|n50(0|2|5)|n7(0(0|1)|10)|ne((c|m)-|on|tf|wf|wg|wt)|nok(6|i)|nzph|o2im|op(ti|wv)|oran|owg1|p800|pan(a|d|t)|pdxg|pg(13|-([1-8]|c))|phil|pire|pl(ay|uc)|pn-2|po(ck|rt|se)|prox|psio|pt-g|qa-a|qc(07|12|21|32|60|-[2-7]|i-)|qtek|r380|r600|raks|rim9|ro(ve|zo)|s55\/|sa(ge|ma|mm|ms|ny|va)|sc(01|h-|oo|p-)|sdk\/|se(c(-|0|1)|47|mc|nd|ri)|sgh-|shar|sie(-|m)|sk-0|sl(45|id)|sm(al|ar|b3|it|t5)|so(ft|ny)|sp(01|h-|v-|v )|sy(01|mb)|t2(18|50)|t6(00|10|18)|ta(gt|lk)|tcl-|tdg-|tel(i|m)|tim-|t-mo|to(pl|sh)|ts(70|m-|m3|m5)|tx-9|up(\.b|g1|si)|utst|v400|v750|veri|vi(rg|te)|vk(40|5[0-3]|-v)|vm40|voda|vulc|vx(52|53|60|61|70|80|81|83|85|98)|w3c(-| )|webc|whit|wi(g |nc|nw)|wmlb|wonu|x700|yas-|your|zeto|zte-/i.test( ua.slice(0, 4) ); - if (t1 || t2) { + const tablet = + /tablet|ipad|android(?!.*mobile)|xoom|sch-i800|kindle|silk|playbook/i.test( + ua + ); + + if (mobile1 || mobile2) { return 'mobile'; } + + if (tablet) { + return 'tablet'; + } + return 'desktop'; } diff --git a/apps/dashboard/Dockerfile b/apps/dashboard/Dockerfile index f8d9c2c0..bafb1ca9 100644 --- a/apps/dashboard/Dockerfile +++ b/apps/dashboard/Dockerfile @@ -1,37 +1,14 @@ -FROM --platform=linux/amd64 node:20-slim AS base +ARG NODE_VERSION=20 -ARG NEXT_PUBLIC_DASHBOARD_URL -ENV NEXT_PUBLIC_DASHBOARD_URL=$NEXT_PUBLIC_DASHBOARD_URL +FROM --platform=linux/amd64 node:${NODE_VERSION}-slim AS base -ARG NEXT_PUBLIC_API_URL -ENV NEXT_PUBLIC_API_URL=$NEXT_PUBLIC_API_URL +ENV SKIP_ENV_VALIDATION="1" ARG DATABASE_URL ENV DATABASE_URL=$DATABASE_URL -ARG CLICKHOUSE_DB -ENV CLICKHOUSE_DB=$CLICKHOUSE_DB - -ARG CLICKHOUSE_PASSWORD -ENV CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD - -ARG CLICKHOUSE_URL -ENV CLICKHOUSE_URL=$CLICKHOUSE_URL - -ARG CLICKHOUSE_USER -ENV CLICKHOUSE_USER=$CLICKHOUSE_USER - -ARG REDIS_URL -ENV REDIS_URL=$REDIS_URL - -ARG NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY -ENV NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY=$NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY - -ARG CLERK_SECRET_KEY -ENV CLERK_SECRET_KEY=$CLERK_SECRET_KEY - -ARG CLERK_SIGNING_SECRET -ENV CLERK_SIGNING_SECRET=$CLERK_SIGNING_SECRET +ARG ENABLE_INSTRUMENTATION_HOOK +ENV ENABLE_INSTRUMENTATION_HOOK=$ENABLE_INSTRUMENTATION_HOOK ENV PNPM_HOME="/pnpm" @@ -39,8 +16,6 @@ ENV PATH="$PNPM_HOME:$PATH" RUN corepack enable -ARG NODE_VERSION=20 - RUN apt update \ && apt install -y curl \ && curl -L https://raw.githubusercontent.com/tj/n/master/bin/n -o n \ @@ -72,12 +47,19 @@ WORKDIR /app/apps/dashboard RUN pnpm install --frozen-lockfile --ignore-scripts WORKDIR /app -COPY apps apps +COPY apps/dashboard apps/dashboard COPY packages packages COPY tooling tooling RUN pnpm db:codegen WORKDIR /app/apps/dashboard + +# Will be replaced on runtime +ENV NEXT_PUBLIC_DASHBOARD_URL="__NEXT_PUBLIC_DASHBOARD_URL__" +ENV NEXT_PUBLIC_API_URL="__NEXT_PUBLIC_API_URL__" +# Check entrypoint for this little fellow +ENV NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY="pk_test_eW9sby5jb20k" + RUN pnpm run build # PROD @@ -116,4 +98,7 @@ WORKDIR /app/apps/dashboard EXPOSE 3000 -CMD ["pnpm", "start"] \ No newline at end of file +# CMD ["pnpm", "start"] +COPY --from=build /app/apps/dashboard/entrypoint.sh /usr/bin/ +RUN chmod +x /usr/bin/entrypoint.sh +ENTRYPOINT ["entrypoint.sh", "pnpm", "start"] \ No newline at end of file diff --git a/apps/dashboard/entrypoint.sh b/apps/dashboard/entrypoint.sh new file mode 100644 index 00000000..7bfe81de --- /dev/null +++ b/apps/dashboard/entrypoint.sh @@ -0,0 +1,32 @@ +#!/bin/bash +set -e + +echo "> Replace env variable placeholders with runtime values..." +# Define an array of environment variables to check +variables_to_replace=( + "NEXT_PUBLIC_DASHBOARD_URL" + "NEXT_PUBLIC_API_URL" + "NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY" +) + +# Replace env variable placeholders with real values +for key in "${variables_to_replace[@]}"; do + value=$(printenv $key) + if [ ! -z "$value" ]; then + echo " - Searching for $key with value $value..." + # Use a custom placeholder for 'NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY' or use the actual key otherwise + if [ "$key" = "NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY" ]; then + placeholder="pk_test_eW9sby5jb20k" + else + placeholder="__${key}__" + fi + # Run the replacement + find /app/apps/dashboard/.next/ -type f \( -name "*.js" -o -name "*.html" \) -exec sed -i "s|$placeholder|$value|g" {} \; + + else + echo " - Skipping $key as it has no value set." + fi +done + +# Execute the container's main process (CMD in Dockerfile) +exec "$@" \ No newline at end of file diff --git a/apps/dashboard/next.config.mjs b/apps/dashboard/next.config.mjs index 1aa63767..dd91bdc0 100644 --- a/apps/dashboard/next.config.mjs +++ b/apps/dashboard/next.config.mjs @@ -30,7 +30,7 @@ const config = { experimental: { // Avoid "Critical dependency: the request of a dependency is an expression" serverComponentsExternalPackages: ['bullmq', 'ioredis'], - instrumentationHook: true, + instrumentationHook: !!process.env.ENABLE_INSTRUMENTATION_HOOK, }, /** * If you are using `appDir` then you must comment the below `i18n` config out. diff --git a/apps/dashboard/package.json b/apps/dashboard/package.json index d002a0fb..845dab3f 100644 --- a/apps/dashboard/package.json +++ b/apps/dashboard/package.json @@ -15,7 +15,7 @@ "dependencies": { "@baselime/node-opentelemetry": "^0.5.8", "@clerk/nextjs": "^5.0.12", - "@clickhouse/client": "^0.2.9", + "@clickhouse/client": "^1.2.0", "@hookform/resolvers": "^3.3.4", "@openpanel/common": "workspace:^", "@openpanel/constants": "workspace:^", @@ -24,6 +24,7 @@ "@openpanel/queue": "workspace:^", "@openpanel/sdk-info": "workspace:^", "@openpanel/validation": "workspace:^", + "@prisma/nextjs-monorepo-workaround-plugin": "^5.12.1", "@radix-ui/react-accordion": "^1.1.2", "@radix-ui/react-alert-dialog": "^1.0.5", "@radix-ui/react-aspect-ratio": "^1.0.3", @@ -112,7 +113,6 @@ "@openpanel/prettier-config": "workspace:*", "@openpanel/trpc": "workspace:*", "@openpanel/tsconfig": "workspace:*", - "@prisma/nextjs-monorepo-workaround-plugin": "^5.12.1", "@types/bcrypt": "^5.0.2", "@types/lodash.debounce": "^4.0.9", "@types/lodash.throttle": "^4.1.9", @@ -145,4 +145,4 @@ ] }, "prettier": "@openpanel/prettier-config" -} +} \ No newline at end of file diff --git a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/events/event-list/event-listener.tsx b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/events/event-list/event-listener.tsx index a446403f..feb0bd0a 100644 --- a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/events/event-list/event-listener.tsx +++ b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/events/event-list/event-listener.tsx @@ -1,12 +1,12 @@ 'use client'; -import { useState } from 'react'; import { Tooltip, TooltipContent, TooltipTrigger, } from '@/components/ui/tooltip'; import { useAppParams } from '@/hooks/useAppParams'; +import { useDebounceVal } from '@/hooks/useDebounceVal'; import useWS from '@/hooks/useWS'; import { cn } from '@/utils/cn'; import dynamic from 'next/dynamic'; @@ -22,11 +22,13 @@ const AnimatedNumbers = dynamic(() => import('react-animated-numbers'), { export default function EventListener() { const router = useRouter(); const { projectId } = useAppParams(); - const [counter, setCounter] = useState(0); + const counter = useDebounceVal(0, 1000, { + maxWait: 5000, + }); useWS(`/live/events/${projectId}`, (event) => { if (event?.name) { - setCounter((prev) => prev + 1); + counter.set((prev) => prev + 1); } }); @@ -35,7 +37,7 @@ export default function EventListener() { - {counter === 0 ? 'Listening to new events' : 'Click to refresh'} + {counter.debounced === 0 + ? 'Listening to new events' + : 'Click to refresh'} ); diff --git a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/index.tsx b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/index.tsx index 60d5b803..3421ec39 100644 --- a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/index.tsx +++ b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/index.tsx @@ -5,7 +5,7 @@ import { getEvents } from '@openpanel/db'; import LiveEvents from './live-events'; type Props = { - projectId?: string; + projectId: string; limit?: number; }; const RealtimeLiveEventsServer = async ({ projectId, limit = 30 }: Props) => { diff --git a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/live-events.tsx b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/live-events.tsx index 6f9b464d..8d7f5145 100644 --- a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/live-events.tsx +++ b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-live-events/live-events.tsx @@ -12,14 +12,14 @@ import type { type Props = { events: (IServiceEventMinimal | IServiceCreateEventPayload)[]; - projectId?: string; + projectId: string; limit: number; }; const RealtimeLiveEvents = ({ events, projectId, limit }: Props) => { const [state, setState] = useState(events ?? []); useWS( - projectId ? `/live/events/${projectId}` : '/live/events', + `/live/events/${projectId}`, (event) => { setState((p) => [event, ...p].slice(0, limit)); } diff --git a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-reloader.tsx b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-reloader.tsx index 775b3f8b..5a989666 100644 --- a/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-reloader.tsx +++ b/apps/dashboard/src/app/(app)/[organizationSlug]/[projectId]/realtime/realtime-reloader.tsx @@ -12,12 +12,21 @@ const RealtimeReloader = ({ projectId }: Props) => { const client = useQueryClient(); const router = useRouter(); - useWS(`/live/visitors/${projectId}`, (value) => { - router.refresh(); - client.refetchQueries({ - type: 'active', - }); - }); + useWS( + `/live/events/${projectId}`, + () => { + router.refresh(); + client.refetchQueries({ + type: 'active', + }); + }, + { + debounce: { + maxWait: 15000, + delay: 15000, + }, + } + ); return null; }; diff --git a/apps/dashboard/src/app/api/clerk/webhook/route.ts b/apps/dashboard/src/app/api/clerk/webhook/route.ts index 7a8a18b9..878c0b13 100644 --- a/apps/dashboard/src/app/api/clerk/webhook/route.ts +++ b/apps/dashboard/src/app/api/clerk/webhook/route.ts @@ -3,6 +3,8 @@ import { pathOr } from 'ramda'; import { AccessLevel, db } from '@openpanel/db'; +export const dynamic = 'force-dynamic'; + export async function POST(request: Request) { const payload: WebhookEvent = await request.json(); diff --git a/apps/dashboard/src/app/providers.tsx b/apps/dashboard/src/app/providers.tsx index 8192da89..dcc8dc75 100644 --- a/apps/dashboard/src/app/providers.tsx +++ b/apps/dashboard/src/app/providers.tsx @@ -63,7 +63,8 @@ function AllProviders({ children }: { children: React.ReactNode }) { disableTransitionOnChange > { + value: T; + debounced: T; + set: React.Dispatch>; +} + +export function useDebounceVal( + initialValue: T, + delay = 500, + options?: Parameters[2] +): DebouncedState { + const [value, setValue] = useState(initialValue); + const [debouncedValue, _setDebouncedValue] = useState(initialValue); + const setDebouncedValue = useMemo( + () => debounce(_setDebouncedValue, delay, options), + [] + ); + useEffect(() => { + setDebouncedValue(value); + }, [value]); + + return { + value, + debounced: debouncedValue, + set: setValue, + }; +} diff --git a/apps/dashboard/src/hooks/useWS.ts b/apps/dashboard/src/hooks/useWS.ts index bfd04ca2..8756fd0e 100644 --- a/apps/dashboard/src/hooks/useWS.ts +++ b/apps/dashboard/src/hooks/useWS.ts @@ -2,11 +2,22 @@ import { use, useEffect, useMemo, useState } from 'react'; import { useAuth } from '@clerk/nextjs'; +import debounce from 'lodash.debounce'; import useWebSocket from 'react-use-websocket'; import { getSuperJson } from '@openpanel/common'; -export default function useWS(path: string, onMessage: (event: T) => void) { +type UseWSOptions = { + debounce?: { + delay: number; + } & Parameters[2]; +}; + +export default function useWS( + path: string, + onMessage: (event: T) => void, + options?: UseWSOptions +) { const auth = useAuth(); const ws = String(process.env.NEXT_PUBLIC_API_URL) .replace(/^https/, 'wss') @@ -18,6 +29,13 @@ export default function useWS(path: string, onMessage: (event: T) => void) { [baseUrl, token] ); + const debouncedOnMessage = useMemo(() => { + if (options?.debounce) { + return debounce(onMessage, options.debounce.delay, options.debounce); + } + return onMessage; + }, [options?.debounce?.delay]); + useEffect(() => { if (auth.isSignedIn) { auth.getToken().then(setToken); @@ -35,7 +53,7 @@ export default function useWS(path: string, onMessage: (event: T) => void) { try { const data = getSuperJson(event.data); if (data) { - onMessage(data); + debouncedOnMessage(data); } } catch (error) { console.error('Error parsing message', error); diff --git a/apps/public/Dockerfile b/apps/public/Dockerfile index 6e19f33b..6ee3fa7f 100644 --- a/apps/public/Dockerfile +++ b/apps/public/Dockerfile @@ -1,43 +1,16 @@ -FROM --platform=linux/amd64 node:20-slim AS base +ARG NODE_VERSION=20 -ARG NEXT_PUBLIC_DASHBOARD_URL -ENV NEXT_PUBLIC_DASHBOARD_URL=$NEXT_PUBLIC_DASHBOARD_URL - -ARG NEXT_PUBLIC_API_URL -ENV NEXT_PUBLIC_API_URL=$NEXT_PUBLIC_API_URL +FROM --platform=linux/amd64 node:${NODE_VERSION}-slim AS base ARG DATABASE_URL ENV DATABASE_URL=$DATABASE_URL -ARG CLICKHOUSE_DB -ENV CLICKHOUSE_DB=$CLICKHOUSE_DB - -ARG CLICKHOUSE_PASSWORD -ENV CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD - -ARG CLICKHOUSE_URL -ENV CLICKHOUSE_URL=$CLICKHOUSE_URL - -ARG CLICKHOUSE_USER -ENV CLICKHOUSE_USER=$CLICKHOUSE_USER - -ARG REDIS_URL -ENV REDIS_URL=$REDIS_URL - -ARG NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY -ENV NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY=$NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY - -ARG CLERK_SECRET_KEY -ENV CLERK_SECRET_KEY=$CLERK_SECRET_KEY - ENV PNPM_HOME="/pnpm" ENV PATH="$PNPM_HOME:$PATH" RUN corepack enable -ARG NODE_VERSION=20 - RUN apt update \ && apt install -y curl \ && curl -L https://raw.githubusercontent.com/tj/n/master/bin/n -o n \ @@ -66,7 +39,7 @@ WORKDIR /app/apps/public RUN pnpm install --frozen-lockfile --ignore-scripts WORKDIR /app -COPY apps apps +COPY apps/public apps/public COPY packages packages COPY tooling tooling RUN pnpm db:codegen diff --git a/apps/worker/Dockerfile b/apps/worker/Dockerfile index 7ff60a1b..4377b42e 100644 --- a/apps/worker/Dockerfile +++ b/apps/worker/Dockerfile @@ -1,33 +1,16 @@ -# Dockerfile that builds the web app only +ARG NODE_VERSION=20 -FROM --platform=linux/amd64 node:20-slim AS base +FROM --platform=linux/amd64 node:${NODE_VERSION}-slim AS base ARG DATABASE_URL ENV DATABASE_URL=$DATABASE_URL -ARG CLICKHOUSE_DB -ENV CLICKHOUSE_DB=$CLICKHOUSE_DB - -ARG CLICKHOUSE_PASSWORD -ENV CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD - -ARG CLICKHOUSE_URL -ENV CLICKHOUSE_URL=$CLICKHOUSE_URL - -ARG CLICKHOUSE_USER -ENV CLICKHOUSE_USER=$CLICKHOUSE_USER - -ARG REDIS_URL -ENV REDIS_URL=$REDIS_URL - ENV PNPM_HOME="/pnpm" ENV PATH="$PNPM_HOME:$PATH" RUN corepack enable -ARG NODE_VERSION=20 - RUN apt update \ && apt install -y curl \ && curl -L https://raw.githubusercontent.com/tj/n/master/bin/n -o n \ @@ -49,6 +32,7 @@ COPY packages/common/package.json packages/common/package.json COPY packages/constants/package.json packages/constants/package.json COPY packages/validation/package.json packages/validation/package.json COPY packages/sdks/sdk/package.json packages/sdks/sdk/package.json +COPY patches patches # BUILD FROM base AS build @@ -57,7 +41,7 @@ WORKDIR /app/apps/worker RUN pnpm install --frozen-lockfile --ignore-scripts WORKDIR /app -COPY apps apps +COPY apps/worker apps/worker COPY packages packages COPY tooling tooling RUN pnpm db:codegen diff --git a/apps/worker/package.json b/apps/worker/package.json index aaccfa37..d86b3950 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -12,16 +12,17 @@ }, "dependencies": { "@baselime/pino-transport": "^0.1.5", - "@bull-board/api": "^5.13.0", - "@bull-board/express": "^5.13.0", + "@bull-board/api": "^5.21.0", + "@bull-board/express": "^5.21.0", "@openpanel/common": "workspace:*", "@openpanel/db": "workspace:*", "@openpanel/logger": "workspace:*", "@openpanel/queue": "workspace:*", "@openpanel/redis": "workspace:*", - "bullmq": "^5.1.1", + "bullmq": "^5.8.7", "express": "^4.18.2", "pino": "^8.17.2", + "pino-pretty": "^10.3.1", "ramda": "^0.29.1", "sqlstring": "^2.3.3", "ua-parser-js": "^1.0.37", diff --git a/apps/worker/scripts/debug.ts b/apps/worker/scripts/debug.ts new file mode 100644 index 00000000..da0a81ed --- /dev/null +++ b/apps/worker/scripts/debug.ts @@ -0,0 +1,139 @@ +import { escape } from 'sqlstring'; + +import type { IClickhouseEvent } from '@openpanel/db'; +import { chQuery, eventBuffer } from '@openpanel/db'; +import { sessionsQueue } from '@openpanel/queue/src/queues'; +import { redis } from '@openpanel/redis'; + +async function debugStalledEvents() { + const keys = await redis.keys('bull:sessions:sessionEnd*'); + const delayedZRange = await redis.zrange( + 'bull:sessions:delayed', + 0, + -1, + 'WITHSCORES' + ); + const delayedValues = delayedZRange.reduce( + (acc, item, index, array) => { + if (index % 2 === 0) { + acc[item] = Number(array[index + 1]) / 0x1000; + } + return acc; + }, + [] as Record + ); + const opKeys = await redis.keys('op:*'); + const stalledEvents = await redis.lrange('op:buffer:events:stalled', 0, -1); + // keys.forEach((key) => { + // console.log(key); + // }); + // console.log('--------------------'); + + const queue = await eventBuffer.getQueue(-1); + + queue + .filter((item) => item.event.name === 'screen_view') + .forEach((item) => { + const date = new Date(item.event.created_at.replace(' ', 'T') + 'Z'); + const match = keys.find((key) => { + return item.event.device_id && key.includes(item.event.device_id); + }); + if (match) { + // console.log( + // date.toISOString(), + // item.event.name, + // item.event.device, + // item.event.session_id, + // item.event.profile_id, + // item.event.device_id, + // match + // ); + } else { + console.log( + 'NO MATCH FOUND!', + date.toISOString(), + item.event.name, + '[SID]', + item.event.session_id, + '[PID]', + item.event.profile_id, + '[DID]', + item.event.device_id + ); + console.log(item.event); + console.log(''); + + // console.log('Not in queue!'); + // log§ + } + }); + + if (stalledEvents.length > 0) { + const res = await chQuery( + `SELECT * FROM events WHERE id IN (${stalledEvents.map((item) => escape(JSON.parse(item).id)).join(',')})` + ); + + stalledEvents.forEach((item) => { + const event = JSON.parse(item) as IClickhouseEvent; + const date = new Date(event.created_at.replace(' ', 'T') + 'Z'); + console.log( + 'STALLED!', + date.toISOString(), + event.name, + '[IN_DB]', + res.find((item) => item.id === event.id) ? 'YES' : 'NO', + '[ID]', + event.id, + '[SID]', + event.session_id, + '[PID]', + event.profile_id, + '[DID]', + event.device_id + ); + // console.log(event); + }); + } + + console.log('OP Keys', opKeys); + + console.log('Queue', queue.length); + console.log('Session Ends', keys.length); + console.log('Stalled Events', stalledEvents.length); + + // keys.forEach((key) => { + // if (key.includes('e1b233e69bcd2132ec7bf343004d4b01')) { + // console.log(key); + // } + // }); + + const delayedJobs = await sessionsQueue.getDelayed(); + console.log('delayedJobs', delayedJobs.length); + delayedJobs.sort((a, b) => a.timestamp + a.delay - (b.timestamp + b.delay)); + let delayedJobsCount = 0; + delayedJobs.forEach((job) => { + const date = new Date(delayedValues[job.id]); + // if date is in the past + // if (date.getTime() - 1000 * 60 * 5 < Date.now()) { + if (date.getTime() < Date.now()) { + delayedJobsCount++; + console.log( + date.toLocaleString('sv-SE'), + 'https://op.coderax.se/worker/queue/sessions/' + + encodeURIComponent(job.id) + ); + } + }); + + console.log('delayedJobsCount', delayedJobsCount); +} + +async function main() { + if (process.argv[2] === 'stalled') { + await debugStalledEvents(); + } + + process.exit(0); +} + +main(); diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 422a5588..8f3e2199 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -6,28 +6,42 @@ import { Worker } from 'bullmq'; import express from 'express'; import { connection, eventsQueue } from '@openpanel/queue'; -import { cronQueue } from '@openpanel/queue/src/queues'; +import { cronQueue, sessionsQueue } from '@openpanel/queue/src/queues'; import { cronJob } from './jobs/cron'; import { eventsJob } from './jobs/events'; +import { sessionsJob } from './jobs/sessions'; const PORT = parseInt(process.env.WORKER_PORT || '3000', 10); const serverAdapter = new ExpressAdapter(); -serverAdapter.setBasePath('/'); +serverAdapter.setBasePath(process.env.SELF_HOSTED ? '/worker' : '/'); const app = express(); const workerOptions: WorkerOptions = { - connection, + connection: { + ...connection, + enableReadyCheck: false, + maxRetriesPerRequest: null, + enableOfflineQueue: true, + }, concurrency: parseInt(process.env.CONCURRENCY || '1', 10), }; async function start() { - new Worker(eventsQueue.name, eventsJob, workerOptions); - - new Worker(cronQueue.name, cronJob, workerOptions); + const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions); + const sessionsWorker = new Worker( + sessionsQueue.name, + sessionsJob, + workerOptions + ); + const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions); createBullBoard({ - queues: [new BullMQAdapter(eventsQueue), new BullMQAdapter(cronQueue)], + queues: [ + new BullMQAdapter(eventsQueue), + new BullMQAdapter(sessionsQueue), + new BullMQAdapter(cronQueue), + ], serverAdapter: serverAdapter, }); @@ -37,9 +51,61 @@ async function start() { console.log(`For the UI, open http://localhost:${PORT}/`); }); - const repeatableJobs = await cronQueue.getRepeatableJobs(); + function workerLogger(worker: string, type: string, err?: Error) { + console.log(`Worker ${worker} -> ${type}`); + if (err) { + console.error(err); + } + } - console.log(repeatableJobs); + const workers = [sessionsWorker, eventsWorker, cronWorker]; + workers.forEach((worker) => { + worker.on('error', (err) => { + workerLogger(worker.name, 'error', err); + }); + + worker.on('closed', () => { + workerLogger(worker.name, 'closed'); + }); + + worker.on('ready', () => { + workerLogger(worker.name, 'ready'); + }); + }); + + async function exitHandler(evtOrExitCodeOrError: number | string | Error) { + try { + await eventsWorker.close(); + await sessionsWorker.close(); + await cronWorker.close(); + } catch (e) { + console.error('EXIT HANDLER ERROR', e); + } + + process.exit(isNaN(+evtOrExitCodeOrError) ? 1 : +evtOrExitCodeOrError); + } + + [ + 'beforeExit', + 'uncaughtException', + 'unhandledRejection', + 'SIGHUP', + 'SIGINT', + 'SIGQUIT', + 'SIGILL', + 'SIGTRAP', + 'SIGABRT', + 'SIGBUS', + 'SIGFPE', + 'SIGUSR1', + 'SIGSEGV', + 'SIGUSR2', + 'SIGTERM', + ].forEach((evt) => + process.on(evt, (evt) => { + exitHandler(evt); + }) + ); await cronQueue.add( 'salt', @@ -55,6 +121,43 @@ async function start() { }, } ); + + await cronQueue.add( + 'flush', + { + type: 'flushEvents', + payload: undefined, + }, + { + jobId: 'flushEvents', + repeat: { + every: process.env.BATCH_INTERVAL + ? parseInt(process.env.BATCH_INTERVAL, 10) + : 1000 * 10, + }, + } + ); + + await cronQueue.add( + 'flush', + { + type: 'flushProfiles', + payload: undefined, + }, + { + jobId: 'flushProfiles', + repeat: { + every: process.env.BATCH_INTERVAL + ? parseInt(process.env.BATCH_INTERVAL, 10) + : 1000 * 10, + }, + } + ); + + const repeatableJobs = await cronQueue.getRepeatableJobs(); + + console.log('Repeatable jobs:'); + console.log(repeatableJobs); } start(); diff --git a/apps/worker/src/jobs/cron.salt.ts b/apps/worker/src/jobs/cron.salt.ts index 9792040e..07beb3a5 100644 --- a/apps/worker/src/jobs/cron.salt.ts +++ b/apps/worker/src/jobs/cron.salt.ts @@ -2,7 +2,7 @@ import { generateSalt } from '@openpanel/common'; import { db, getCurrentSalt } from '@openpanel/db'; export async function salt() { - const oldSalt = await getCurrentSalt(); + const oldSalt = await getCurrentSalt().catch(() => null); const newSalt = await db.salt.create({ data: { salt: generateSalt(), @@ -13,7 +13,7 @@ export async function salt() { await db.salt.deleteMany({ where: { salt: { - notIn: [newSalt.salt, oldSalt], + notIn: oldSalt ? [newSalt.salt, oldSalt] : [newSalt.salt], }, }, }); diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index 87e5d588..382c24bc 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -1,5 +1,6 @@ import type { Job } from 'bullmq'; +import { eventBuffer, profileBuffer } from '@openpanel/db'; import type { CronQueuePayload } from '@openpanel/queue/src/queues'; import { salt } from './cron.salt'; @@ -9,5 +10,11 @@ export async function cronJob(job: Job) { case 'salt': { return await salt(); } + case 'flushEvents': { + return await eventBuffer.flush(); + } + case 'flushProfiles': { + return await profileBuffer.flush(); + } } } diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index ee7b2a22..33d24a20 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -1,23 +1,26 @@ import type { Job } from 'bullmq'; import { getTime } from '@openpanel/common'; -import { createEvent, getEvents } from '@openpanel/db'; +import { createEvent, eventBuffer, getEvents } from '@openpanel/db'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue/src/queues'; export async function createSessionEnd( job: Job ) { const payload = job.data.payload; + const eventsInBuffer = await eventBuffer.findMany( + (item) => item.event.session_id === payload.sessionId + ); const sql = ` SELECT * FROM events WHERE - device_id = '${payload.deviceId}' + session_id = '${payload.sessionId}' AND created_at >= ( SELECT created_at FROM events WHERE - device_id = '${payload.deviceId}' + session_id = '${payload.sessionId}' AND name = 'session_start' ORDER BY created_at DESC LIMIT 1 @@ -25,7 +28,11 @@ export async function createSessionEnd( ORDER BY created_at DESC `; job.log(sql); - const events = await getEvents(sql); + const eventsInDb = await getEvents(sql); + // sort last inserted first + const events = [...eventsInBuffer, ...eventsInDb].sort( + (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime() + ); events.map((event, index) => { job.log( @@ -64,7 +71,7 @@ export async function createSessionEnd( }, name: 'session_end', duration: sessionDuration, - path: lastEvent.path, + path: screenViews[0]?.path ?? '', createdAt: new Date(getTime(lastEvent?.createdAt) + 100), }); } diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 67460d02..b77ea406 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,20 +1,30 @@ -import { logger } from '@/utils/logger'; import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer'; -import { isUserAgentSet, parseUserAgent } from '@/utils/parse-user-agent'; +import { parseUserAgent } from '@/utils/parse-user-agent'; import { isSameDomain, parsePath } from '@/utils/url'; -import type { Job, JobsOptions } from 'bullmq'; +import type { Job } from 'bullmq'; import { omit } from 'ramda'; -import { escape } from 'sqlstring'; import { v4 as uuid } from 'uuid'; import { getTime, toISOString } from '@openpanel/common'; import type { IServiceCreateEventPayload } from '@openpanel/db'; -import { createEvent, getEvents } from '@openpanel/db'; +import { createEvent } from '@openpanel/db'; +import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; import { findJobByPrefix } from '@openpanel/queue'; -import { eventsQueue } from '@openpanel/queue/src/queues'; -import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue/src/queues'; +import { eventsQueue, sessionsQueue } from '@openpanel/queue/src/queues'; +import type { + EventsQueuePayloadCreateSessionEnd, + EventsQueuePayloadIncomingEvent, +} from '@openpanel/queue/src/queues'; import { redis } from '@openpanel/redis'; +function noDateInFuture(eventDate: Date): Date { + if (eventDate > new Date()) { + return new Date(); + } else { + return eventDate; + } +} + const GLOBAL_PROPERTIES = ['__path', '__referrer']; const SESSION_TIMEOUT = 1000 * 60 * 30; const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000; @@ -27,12 +37,8 @@ export async function incomingEvent(job: Job) { projectId, currentDeviceId, previousDeviceId, - // TODO: Remove after 2024-09-26 - currentDeviceIdDeprecated, - previousDeviceIdDeprecated, + priority, } = job.data.payload; - let deviceId: string | null = null; - const properties = body.properties ?? {}; const getProperty = (name: string): string | undefined => { // replace thing is just for older sdks when we didn't have `__` @@ -44,22 +50,22 @@ export async function incomingEvent(job: Job) { | undefined) ?? undefined ); }; - const { ua } = headers; - const profileId = body.profileId ?? ''; - const createdAt = new Date(body.timestamp); + + const profileId = body.profileId ? String(body.profileId) : ''; + const createdAt = noDateInFuture(new Date(body.timestamp)); const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) ? null : parseReferrer(getProperty('__referrer')); const utmReferrer = getReferrerWithQuery(query); - const uaInfo = ua ? parseUserAgent(ua) : null; - const isServerEvent = ua ? !isUserAgentSet(ua) : true; + const uaInfo = parseUserAgent(headers.ua); - if (isServerEvent) { - const [event] = await getEvents( - `SELECT * FROM events WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` - ); + if (uaInfo.isServer) { + const event = await getLastScreenViewFromProfileId({ + profileId, + projectId, + }); const payload: Omit = { name: body.name, @@ -67,7 +73,10 @@ export async function incomingEvent(job: Job) { sessionId: event?.sessionId || '', profileId, projectId, - properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties)), + properties: { + ...omit(GLOBAL_PROPERTIES, properties), + user_agent: headers.ua, + }, createdAt, country: event?.country || geo.country || '', city: event?.city || geo.city || '', @@ -78,7 +87,7 @@ export async function incomingEvent(job: Job) { osVersion: event?.osVersion ?? '', browser: event?.browser ?? '', browserVersion: event?.browserVersion ?? '', - device: event?.device ?? '', + device: event?.device ?? uaInfo.device ?? '', brand: event?.brand ?? '', model: event?.model ?? '', duration: 0, @@ -94,82 +103,50 @@ export async function incomingEvent(job: Job) { return createEvent(payload); } - const [sessionEndKeys, eventsKeys] = await Promise.all([ - redis.keys(`bull:events:sessionEnd:${projectId}:*`), - redis.keys(`bull:events:event:${projectId}:*`), - ]); + const sessionEnd = await getSessionEndWithPriority(priority)({ + projectId, + currentDeviceId, + previousDeviceId, + }); - const sessionEndJobCurrentDeviceId = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${currentDeviceId}:` - ); - const sessionEndJobPreviousDeviceId = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${previousDeviceId}:` - ); - // TODO: Remove after 2024-09-26 - const sessionEndJobCurrentDeviceIdDeprecated = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${currentDeviceIdDeprecated}:` - ); - const sessionEndJobPreviousDeviceIdDeprecated = await findJobByPrefix( - eventsQueue, - sessionEndKeys, - `sessionEnd:${projectId}:${previousDeviceIdDeprecated}:` - ); + const sessionEndPayload = (sessionEnd?.job?.data + ?.payload as EventsQueuePayloadCreateSessionEnd['payload']) || { + sessionId: uuid(), + deviceId: currentDeviceId, + profileId, + }; - let createSessionStart = false; + const sessionEndJobId = + sessionEnd?.job.id ?? + `sessionEnd:${projectId}:${sessionEndPayload.deviceId}:${Date.now()}`; - if (sessionEndJobCurrentDeviceId) { - deviceId = currentDeviceId; - sessionEndJobCurrentDeviceId.changeDelay(SESSION_END_TIMEOUT); - } else if (sessionEndJobPreviousDeviceId) { - deviceId = previousDeviceId; - sessionEndJobPreviousDeviceId.changeDelay(SESSION_END_TIMEOUT); - } else if (sessionEndJobCurrentDeviceIdDeprecated) { - deviceId = currentDeviceIdDeprecated; - sessionEndJobCurrentDeviceIdDeprecated.changeDelay(SESSION_END_TIMEOUT); - } else if (sessionEndJobPreviousDeviceIdDeprecated) { - deviceId = previousDeviceIdDeprecated; - sessionEndJobPreviousDeviceIdDeprecated.changeDelay(SESSION_END_TIMEOUT); + if (sessionEnd) { + // If for some reason we have a session end job that is not a createSessionEnd job + if (sessionEnd.job.data.type !== 'createSessionEnd') { + throw new Error('Invalid session end job'); + } + + await sessionEnd.job.changeDelay(SESSION_TIMEOUT); } else { - deviceId = currentDeviceId; - createSessionStart = true; - // Queue session end - eventsQueue.add( - 'event', + await sessionsQueue.add( + 'session', { type: 'createSessionEnd', - payload: { - deviceId, - }, + payload: sessionEndPayload, }, { delay: SESSION_END_TIMEOUT, - jobId: `sessionEnd:${projectId}:${deviceId}:${Date.now()}`, + jobId: sessionEndJobId, } ); } - const prevEventJob = await findJobByPrefix( - eventsQueue, - eventsKeys, - `event:${projectId}:${deviceId}:` - ); - - const [sessionStartEvent] = await getEvents( - `SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` - ); - const payload: Omit = { name: body.name, - deviceId, + deviceId: sessionEndPayload.deviceId, + sessionId: sessionEndPayload.sessionId, profileId, projectId, - sessionId: createSessionStart ? uuid() : sessionStartEvent?.sessionId ?? '', properties: Object.assign({}, omit(GLOBAL_PROPERTIES, properties), { __hash: hash, __query: query, @@ -189,7 +166,7 @@ export async function incomingEvent(job: Job) { model: uaInfo?.model ?? '', duration: 0, path: path, - origin: origin || sessionStartEvent?.origin || '', + origin: origin, referrer: referrer?.url, referrerName: referrer?.name || utmReferrer?.name || '', referrerType: referrer?.type || utmReferrer?.type || '', @@ -197,76 +174,7 @@ export async function incomingEvent(job: Job) { meta: undefined, }; - const isDelayed = prevEventJob ? await prevEventJob?.isDelayed() : false; - - if (isDelayed && prevEventJob && prevEventJob.data.type === 'createEvent') { - const prevEvent = prevEventJob.data.payload; - const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt); - job.log(`prevEvent ${JSON.stringify(prevEvent, null, 2)}`); - - // Set path from prev screen_view event if current event is not a screen_view - if (payload.name != 'screen_view') { - payload.path = prevEvent.path; - } - - if (payload.name === 'screen_view') { - if (duration < 0) { - logger.info({ prevEvent, payload }, 'Duration is negative'); - } else { - try { - // Skip update duration if it's wrong - // Seems like request is not in right order - await prevEventJob.updateData({ - type: 'createEvent', - payload: { - ...prevEvent, - duration, - }, - }); - } catch (error) { - logger.error( - { - error, - prevEventJobStatus: await prevEventJob - .getState() - .catch(() => 'unknown'), - }, - `Failed update delayed job` - ); - } - } - - try { - await prevEventJob.promote(); - } catch (error) { - logger.error( - { - error, - prevEventJobStatus: await prevEventJob - .getState() - .catch(() => 'unknown'), - prevEvent, - currEvent: payload, - }, - `Failed to promote job` - ); - } - } - } else if (payload.name !== 'screen_view') { - job.log( - `no previous job ${JSON.stringify( - { - prevEventJob, - payload, - }, - null, - 2 - )}` - ); - } - - if (createSessionStart) { - // We do not need to queue session_start + if (!sessionEnd) { await createEvent({ ...payload, name: 'session_start', @@ -275,40 +183,78 @@ export async function incomingEvent(job: Job) { }); } - const options: JobsOptions = {}; - if (payload.name === 'screen_view') { - options.delay = SESSION_TIMEOUT; - options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`; + return createEvent(payload); +} + +function getSessionEndWithPriority( + priority: boolean, + count = 0 +): typeof getSessionEnd { + return async (args) => { + const res = await getSessionEnd(args); + + if (count > 5) { + throw new Error('Failed to get session end'); + } + + // if we get simultaneous requests we want to avoid race conditions with getting the session end + // one of the events will get priority and the other will wait for the first to finish + if (res === null && priority === false) { + await new Promise((resolve) => setTimeout(resolve, 50)); + return getSessionEndWithPriority(priority, count + 1)(args); + } + + return res; + }; +} + +async function getSessionEnd({ + projectId, + currentDeviceId, + previousDeviceId, +}: { + projectId: string; + currentDeviceId: string; + previousDeviceId: string; +}) { + const sessionEndKeys = await redis.keys(`*:sessionEnd:${projectId}:*`); + + const sessionEndJobCurrentDeviceId = await findJobByPrefix( + sessionsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${currentDeviceId}:` + ); + if (sessionEndJobCurrentDeviceId) { + return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId }; } - job.log( - `event is queued ${JSON.stringify( - { - ua, - uaInfo, - referrer, - profileId, - projectId, - deviceId, - geo, - sessionStartEvent, - path, - payload, - }, - null, - 2 - )}` + const sessionEndJobCurrentDeviceId2 = await findJobByPrefix( + eventsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${currentDeviceId}:` ); + if (sessionEndJobCurrentDeviceId2) { + return { deviceId: currentDeviceId, job: sessionEndJobCurrentDeviceId2 }; + } - // Queue event instead of creating it, - // since we want to update duration if we get more events in the same session - // The event will only be delayed if it's a screen_view event - return eventsQueue.add( - 'event', - { - type: 'createEvent', - payload, - }, - options + const sessionEndJobPreviousDeviceId = await findJobByPrefix( + sessionsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${previousDeviceId}:` ); + if (sessionEndJobPreviousDeviceId) { + return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId }; + } + + const sessionEndJobPreviousDeviceId2 = await findJobByPrefix( + eventsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${previousDeviceId}:` + ); + if (sessionEndJobPreviousDeviceId2) { + return { deviceId: previousDeviceId, job: sessionEndJobPreviousDeviceId2 }; + } + + // Create session + return null; } diff --git a/apps/worker/src/jobs/events.ts b/apps/worker/src/jobs/events.ts index 44fa1f29..b833effd 100644 --- a/apps/worker/src/jobs/events.ts +++ b/apps/worker/src/jobs/events.ts @@ -1,7 +1,7 @@ import type { Job } from 'bullmq'; import { escape } from 'sqlstring'; -import { chQuery, createEvent, db } from '@openpanel/db'; +import { chQuery, db } from '@openpanel/db'; import type { EventsQueuePayload, EventsQueuePayloadCreateSessionEnd, @@ -16,22 +16,6 @@ export async function eventsJob(job: Job) { case 'incomingEvent': { return await incomingEvent(job as Job); } - case 'createEvent': { - if (job.attemptsStarted > 1 && job.data.payload.duration < 0) { - job.data.payload.duration = 0; - } - const createdEvent = await createEvent(job.data.payload); - try { - await updateEventsCount(job.data.payload.projectId); - } catch (e) { - if (e instanceof Error) { - job.log(`Failed to update events count: ${e.message}`); - } else { - job.log(`Failed to update events count: Unknown issue`); - } - } - return createdEvent; - } case 'createSessionEnd': { return await createSessionEnd( job as Job diff --git a/apps/worker/src/jobs/sessions.ts b/apps/worker/src/jobs/sessions.ts new file mode 100644 index 00000000..b773b8c3 --- /dev/null +++ b/apps/worker/src/jobs/sessions.ts @@ -0,0 +1,9 @@ +import type { Job } from 'bullmq'; + +import type { SessionsQueuePayload } from '@openpanel/queue/src/queues'; + +import { createSessionEnd } from './events.create-session-end'; + +export async function sessionsJob(job: Job) { + return await createSessionEnd(job); +} diff --git a/apps/worker/src/utils/parse-user-agent.ts b/apps/worker/src/utils/parse-user-agent.ts index 4a8842a9..c8671e73 100644 --- a/apps/worker/src/utils/parse-user-agent.ts +++ b/apps/worker/src/utils/parse-user-agent.ts @@ -1,11 +1,16 @@ import { UAParser } from 'ua-parser-js'; -export function isUserAgentSet(ua: string) { - return ua !== 'node' && ua !== 'undici' && !!ua; -} +const parsedServerUa = { + isServer: true, + device: 'server', +} as const; -export function parseUserAgent(ua: string) { +export function parseUserAgent(ua?: string | null) { + if (!ua) return parsedServerUa; const res = new UAParser(ua).getResult(); + + if (isServer(ua)) return parsedServerUa; + return { os: res.os.name, osVersion: res.os.version, @@ -14,20 +19,86 @@ export function parseUserAgent(ua: string) { device: res.device.type ?? getDevice(ua), brand: res.device.vendor, model: res.device.model, - }; + isServer: false, + } as const; +} + +const userAgentServerList: string[] = [ + // Node.js libraries + 'node', + 'node-fetch', + 'axios', + 'request', + 'superagent', + 'undici', + + // Python libraries + 'python-requests', + 'python-urllib', + + // Ruby libraries + 'Faraday', + 'Ruby', + 'http.rb', + + // Go libraries + 'Go-http-client', + 'Go-http-client', + + // Java libraries + 'Apache-HttpClient', + 'okhttp', + 'okhowtp', + + // PHP libraries + 'GuzzleHttp', + 'PHP-cURL', + + // Other + 'Dart', + 'RestSharp', // Popular .NET HTTP client library + 'HttpClientFactory', // .NET's typed client factory + 'Ktor', // A client for Kotlin + 'Ning', // Async HTTP client for Java + 'grpc-csharp', // gRPC for C# + 'Volley', // HTTP library used in Android apps for making network requests + 'Spring', + 'vert.x', + 'grpc-', +]; + +function isServer(userAgent: string) { + const match = userAgentServerList.some((server) => + userAgent.includes(server) + ); + if (match) { + return true; + } + + return !!userAgent.match(/^[^\s]+\/[\d.]+$/); } export function getDevice(ua: string) { - const t1 = + const mobile1 = /(android|bb\d+|meego).+mobile|avantgo|bada\/|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|mobile.+firefox|netfront|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\/|plucker|pocket|psp|series(4|6)0|symbian|treo|up\.(browser|link)|vodafone|wap|windows ce|xda|xiino/i.test( ua ); - const t2 = + const mobile2 = /1207|6310|6590|3gso|4thp|50[1-6]i|770s|802s|a wa|abac|ac(er|oo|s-)|ai(ko|rn)|al(av|ca|co)|amoi|an(ex|ny|yw)|aptu|ar(ch|go)|as(te|us)|attw|au(di|-m|r |s )|avan|be(ck|ll|nq)|bi(lb|rd)|bl(ac|az)|br(e|v)w|bumb|bw-(n|u)|c55\/|capi|ccwa|cdm-|cell|chtm|cldc|cmd-|co(mp|nd)|craw|da(it|ll|ng)|dbte|dc-s|devi|dica|dmob|do(c|p)o|ds(12|-d)|el(49|ai)|em(l2|ul)|er(ic|k0)|esl8|ez([4-7]0|os|wa|ze)|fetc|fly(-|_)|g1 u|g560|gene|gf-5|g-mo|go(\.w|od)|gr(ad|un)|haie|hcit|hd-(m|p|t)|hei-|hi(pt|ta)|hp( i|ip)|hs-c|ht(c(-| |_|a|g|p|s|t)|tp)|hu(aw|tc)|i-(20|go|ma)|i230|iac( |-|\/)|ibro|idea|ig01|ikom|im1k|inno|ipaq|iris|ja(t|v)a|jbro|jemu|jigs|kddi|keji|kgt( |\/)|klon|kpt |kwc-|kyo(c|k)|le(no|xi)|lg( g|\/(k|l|u)|50|54|-[a-w])|libw|lynx|m1-w|m3ga|m50\/|ma(te|ui|xo)|mc(01|21|ca)|m-cr|me(rc|ri)|mi(o8|oa|ts)|mmef|mo(01|02|bi|de|do|t(-| |o|v)|zz)|mt(50|p1|v )|mwbp|mywa|n10[0-2]|n20[2-3]|n30(0|2)|n50(0|2|5)|n7(0(0|1)|10)|ne((c|m)-|on|tf|wf|wg|wt)|nok(6|i)|nzph|o2im|op(ti|wv)|oran|owg1|p800|pan(a|d|t)|pdxg|pg(13|-([1-8]|c))|phil|pire|pl(ay|uc)|pn-2|po(ck|rt|se)|prox|psio|pt-g|qa-a|qc(07|12|21|32|60|-[2-7]|i-)|qtek|r380|r600|raks|rim9|ro(ve|zo)|s55\/|sa(ge|ma|mm|ms|ny|va)|sc(01|h-|oo|p-)|sdk\/|se(c(-|0|1)|47|mc|nd|ri)|sgh-|shar|sie(-|m)|sk-0|sl(45|id)|sm(al|ar|b3|it|t5)|so(ft|ny)|sp(01|h-|v-|v )|sy(01|mb)|t2(18|50)|t6(00|10|18)|ta(gt|lk)|tcl-|tdg-|tel(i|m)|tim-|t-mo|to(pl|sh)|ts(70|m-|m3|m5)|tx-9|up(\.b|g1|si)|utst|v400|v750|veri|vi(rg|te)|vk(40|5[0-3]|-v)|vm40|voda|vulc|vx(52|53|60|61|70|80|81|83|85|98)|w3c(-| )|webc|whit|wi(g |nc|nw)|wmlb|wonu|x700|yas-|your|zeto|zte-/i.test( ua.slice(0, 4) ); - if (t1 || t2) { + const tablet = + /tablet|ipad|android(?!.*mobile)|xoom|sch-i800|kindle|silk|playbook/i.test( + ua + ); + + if (mobile1 || mobile2) { return 'mobile'; } + + if (tablet) { + return 'tablet'; + } + return 'desktop'; } diff --git a/package.json b/package.json index 353191cd..f5e8b07a 100644 --- a/package.json +++ b/package.json @@ -26,5 +26,10 @@ "semver": "^7.5.4", "typescript": "^5.2.2" }, - "prettier": "@openpanel/prettier-config" -} + "prettier": "@openpanel/prettier-config", + "pnpm": { + "patchedDependencies": { + "@bull-board/api@5.21.0": "patches/@bull-board__api@5.21.0.patch" + } + } +} \ No newline at end of file diff --git a/packages/common/src/object.ts b/packages/common/src/object.ts index aa849557..e012ed1b 100644 --- a/packages/common/src/object.ts +++ b/packages/common/src/object.ts @@ -13,9 +13,14 @@ export function toDots( }; } + if (value === undefined || value === null) { + return acc; + } + return { ...acc, - [`${path}${key}`]: typeof value === 'string' ? value.trim() : value, + [`${path}${key}`]: + typeof value === 'string' ? value.trim() : String(value), }; }, {}); } @@ -52,3 +57,46 @@ export function getSuperJson(str: string): T | null { } return json; } + +type AnyObject = Record; + +export function deepMergeObjects(target: AnyObject, source: AnyObject): T { + const merged: AnyObject = {}; + // Include all keys from both objects + const allKeys = new Set([...Object.keys(target), ...Object.keys(source)]); + + allKeys.forEach((key) => { + const targetValue = target[key]; + const sourceValue = source[key]; + + if ( + (isNil(sourceValue) && !isNil(targetValue)) || + (sourceValue === '' && + typeof targetValue === 'string' && + targetValue !== '') + ) { + // Keep target value if source value is null or undefined + merged[key] = targetValue; + } else if ( + sourceValue !== undefined && + isObject(targetValue) && + isObject(sourceValue) + ) { + // Recursively merge objects + merged[key] = deepMergeObjects(targetValue, sourceValue); + } else if (sourceValue !== undefined) { + // Directly assign any non-undefined source values + merged[key] = sourceValue; + } else if (sourceValue === undefined && target[key] !== undefined) { + // Keep target value if source value is undefined + merged[key] = targetValue; + } + }); + + return merged as T; +} + +// Helper function to check if a value is an object (but not null or an array) +function isObject(value: any): boolean { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} diff --git a/packages/db/clickhouse_init.sql b/packages/db/clickhouse_init.sql new file mode 100644 index 00000000..6c71cdb0 --- /dev/null +++ b/packages/db/clickhouse_init.sql @@ -0,0 +1,73 @@ +CREATE DATABASE IF NOT EXISTS openpanel; + +CREATE TABLE IF NOT EXISTS openpanel.events ( + `id` UUID DEFAULT generateUUIDv4(), + `name` String, + `device_id` String, + `profile_id` String, + `project_id` String, + `session_id` String, + `path` String, + `origin` String, + `referrer` String, + `referrer_name` String, + `referrer_type` String, + `duration` UInt64, + `properties` Map(String, String), + `created_at` DateTime64(3), + `country` String, + `city` String, + `region` String, + `longitude` Nullable(Float32), + `latitude` Nullable(Float32), + `os` String, + `os_version` String, + `browser` String, + `browser_version` String, + -- device: mobile/desktop/tablet + `device` String, + -- brand: (Samsung, OnePlus) + `brand` String, + -- model: (Samsung Galaxy, iPhone X) + `model` String +) ENGINE MergeTree +ORDER BY + (project_id, created_at, profile_id) SETTINGS index_granularity = 8192; + +CREATE TABLE IF NOT EXISTS openpanel.events_bots ( + `id` UUID DEFAULT generateUUIDv4(), + `project_id` String, + `name` String, + `type` String, + `path` String, + `created_at` DateTime64(3), +) ENGINE MergeTree +ORDER BY + (project_id, created_at) SETTINGS index_granularity = 8192; + +CREATE TABLE IF NOT EXISTS openpanel.profiles ( + `id` String, + `first_name` String, + `last_name` String, + `email` String, + `avatar` String, + `properties` Map(String, String), + `project_id` String, + `created_at` DateTime +) ENGINE = ReplacingMergeTree(created_at) +ORDER BY + (id) SETTINGS index_granularity = 8192; + +--- Materialized views (DAU) +CREATE MATERIALIZED VIEW IF NOT EXISTS dau_mv ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMMDD(date) +ORDER BY + (project_id, date) POPULATE AS +SELECT + toDate(created_at) as date, + uniqState(profile_id) as profile_id, + project_id +FROM + events +GROUP BY + date, + project_id; \ No newline at end of file diff --git a/packages/db/index.ts b/packages/db/index.ts index 137fdb6d..c6be61c3 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -15,3 +15,4 @@ export * from './src/services/user.service'; export * from './src/services/reference.service'; export * from './src/services/id.service'; export * from './src/services/retention.service'; +export * from './src/buffers'; diff --git a/packages/db/package.json b/packages/db/package.json index bc8086b9..adb79697 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -13,7 +13,7 @@ }, "dependencies": { "@clerk/nextjs": "^5.0.2", - "@clickhouse/client": "^1.0.1", + "@clickhouse/client": "^1.2.0", "@openpanel/common": "workspace:*", "@openpanel/constants": "workspace:*", "@openpanel/redis": "workspace:*", diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts new file mode 100644 index 00000000..4d604dfd --- /dev/null +++ b/packages/db/src/buffers/buffer.ts @@ -0,0 +1,137 @@ +import type { Redis } from '@openpanel/redis'; + +export const DELETE = '__DELETE__'; + +export type QueueItem = { + event: T; + index: number; +}; + +export type OnInsert = (data: T) => unknown; + +export type OnCompleted = + | ((data: T[]) => Promise) + | ((data: T[]) => unknown[]); + +export type ProcessQueue = (data: QueueItem[]) => Promise; + +export type Find = ( + callback: (item: QueueItem) => boolean +) => Promise; + +export type FindMany = ( + callback: (item: QueueItem) => boolean +) => Promise; + +export abstract class RedisBuffer { + // constructor + public prefix = 'op:buffer'; + public table: string; + public batchSize?: number; + public redis: Redis; + + // abstract methods + public abstract onInsert?: OnInsert; + public abstract onCompleted?: OnCompleted; + public abstract processQueue: ProcessQueue; + public abstract find: Find; + public abstract findMany: FindMany; + + constructor(options: { table: string; redis: Redis; batchSize?: number }) { + this.table = options.table; + this.redis = options.redis; + this.batchSize = options.batchSize; + } + + public getKey(name?: string) { + const key = this.prefix + ':' + this.table; + if (name) { + return `${key}:${name}`; + } + return key; + } + + public async insert(value: T) { + this.onInsert?.(value); + await this.redis.rpush(this.getKey(), JSON.stringify(value)); + + const length = await this.redis.llen(this.getKey()); + if (this.batchSize && length >= this.batchSize) { + this.flush(); + } + } + + public async flush() { + try { + const queue = await this.getQueue(this.batchSize || -1); + + if (queue.length === 0) { + return { + count: 0, + data: [], + }; + } + + try { + const indexes = await this.processQueue(queue); + await this.deleteIndexes(indexes); + const data = indexes + .map((index) => queue[index]?.event) + .filter((event): event is T => event !== null); + + if (this.onCompleted) { + const res = await this.onCompleted(data); + return { + count: res.length, + data: res, + }; + } + + return { + count: indexes.length, + data: indexes, + }; + } catch (e) { + console.log( + `[${this.getKey()}] Failed to processQueue while flushing:`, + e + ); + const timestamp = new Date().getTime(); + await this.redis.hset(this.getKey(`failed:${timestamp}`), { + error: e instanceof Error ? e.message : 'Unknown error', + data: JSON.stringify(queue.map((item) => item.event)), + retries: 0, + }); + } + } catch (e) { + console.log(`[${this.getKey()}] Failed to getQueue while flushing:`, e); + } + } + + public async deleteIndexes(indexes: number[]) { + const multi = this.redis.multi(); + indexes.forEach((index) => { + multi.lset(this.getKey(), index, DELETE); + }); + multi.lrem(this.getKey(), 0, DELETE); + await multi.exec(); + } + + public async getQueue(limit: number): Promise[]> { + const queue = await this.redis.lrange(this.getKey(), 0, limit); + return queue + .map((item, index) => ({ + event: this.transformQueueItem(item), + index, + })) + .filter((item): item is QueueItem => item.event !== null); + } + + private transformQueueItem(item: string): T | null { + try { + return JSON.parse(item); + } catch (e) { + return null; + } + } +} diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts new file mode 100644 index 00000000..59808a66 --- /dev/null +++ b/packages/db/src/buffers/event-buffer.ts @@ -0,0 +1,212 @@ +import { groupBy } from 'ramda'; +import SuperJSON from 'superjson'; + +import { deepMergeObjects } from '@openpanel/common'; +import { redis, redisPub } from '@openpanel/redis'; + +import { ch } from '../clickhouse-client'; +import { transformEvent } from '../services/event.service'; +import type { + IClickhouseEvent, + IServiceCreateEventPayload, +} from '../services/event.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, + QueueItem, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +const sortOldestFirst = ( + a: QueueItem, + b: QueueItem +) => + new Date(a.event.created_at).getTime() - + new Date(b.event.created_at).getTime(); + +export class EventBuffer extends RedisBuffer { + constructor() { + super({ + table: 'events', + redis, + }); + } + + public onInsert?: OnInsert | undefined = (event) => { + redisPub.publish( + 'event:received', + SuperJSON.stringify(transformEvent(event)) + ); + this.redis.setex( + `live:event:${event.project_id}:${event.profile_id}`, + '', + 60 * 5 + ); + }; + + public onCompleted?: OnCompleted | undefined = ( + savedEvents + ) => { + for (const event of savedEvents) { + redisPub.publish( + 'event:saved', + SuperJSON.stringify(transformEvent(event)) + ); + } + + return savedEvents.map((event) => event.id); + }; + + public processQueue: ProcessQueue = async (queue) => { + const itemsToClickhouse = new Set>(); + const itemsToStalled = new Set>(); + + // Sort data by created_at + // oldest first + queue.sort(sortOldestFirst); + + // All events thats not a screen_view can be sent to clickhouse + // We only need screen_views since we want to calculate the duration of each screen + // To do this we need a minimum of 2 screen_views + queue + .filter( + (item) => + item.event.name !== 'screen_view' || item.event.device === 'server' + ) + .forEach((item) => { + // Find the last event with data and merge it with the current event + // We use profile_id here since this property can be set from backend as well + const lastEventWithData = queue + .slice(0, item.index) + .findLast((lastEvent) => { + return ( + lastEvent.event.project_id === item.event.project_id && + lastEvent.event.profile_id === item.event.profile_id && + lastEvent.event.path !== '' + ); + }); + + const event = deepMergeObjects( + lastEventWithData?.event || {}, + item.event + ); + + if (lastEventWithData) { + event.properties.__properties_from = lastEventWithData.event.id; + } + + return itemsToClickhouse.add({ + ...item, + event, + }); + }); + + // Group screen_view events by session_id + const grouped = groupBy( + (item) => item.event.session_id, + queue.filter( + (item) => + item.event.name === 'screen_view' && item.event.device !== 'server' + ) + ); + + // Iterate over each group + for (const [sessionId, screenViews] of Object.entries(grouped)) { + if (sessionId === '' || !sessionId) { + continue; + } + + // If there is only one screen_view event we can send it back to redis since we can't calculate the duration + const hasSessionEnd = queue.find( + (item) => + item.event.name === 'session_end' && + item.event.session_id === sessionId + ); + + screenViews + ?.slice() + .sort(sortOldestFirst) + .forEach((item, index) => { + const nextScreenView = screenViews[index + 1]; + // if nextScreenView does not exists we can't calculate the duration (last event in session) + if (nextScreenView) { + const duration = + new Date(nextScreenView.event.created_at).getTime() - + new Date(item.event.created_at).getTime(); + const event = { + ...item.event, + duration, + }; + event.properties.__duration_from = nextScreenView.event.id; + itemsToClickhouse.add({ + ...item, + event, + }); + // push last event in session if we have a session_end event + } else if (hasSessionEnd) { + itemsToClickhouse.add(item); + } + }); + } // for of end + + // Check if we have any events that has been in the queue for more than 24 hour + // This should not theoretically happen but if it does we should move them to stalled + queue.forEach((item) => { + if ( + !itemsToClickhouse.has(item) && + new Date(item.event.created_at).getTime() < + new Date().getTime() - 1000 * 60 * 60 * 24 + ) { + itemsToStalled.add(item); + } + }); + + if (itemsToStalled.size > 0) { + const multi = this.redis.multi(); + for (const item of itemsToStalled) { + multi.rpush(this.getKey('stalled'), JSON.stringify(item.event)); + } + await multi.exec(); + } + + await ch.insert({ + table: 'events', + values: Array.from(itemsToClickhouse).map((item) => item.event), + format: 'JSONEachRow', + }); + + return [ + ...Array.from(itemsToClickhouse).map((item) => item.index), + ...Array.from(itemsToStalled).map((item) => item.index), + ]; + }; + + public findMany: FindMany = + async (callback) => { + return this.getQueue(-1) + .then((queue) => { + return queue + .filter(callback) + .map((item) => transformEvent(item.event)); + }) + .catch(() => { + return []; + }); + }; + + public find: Find = async ( + callback + ) => { + return this.getQueue(-1) + .then((queue) => { + const match = queue.find(callback); + return match ? transformEvent(match.event) : null; + }) + .catch(() => { + return null; + }); + }; +} diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts new file mode 100644 index 00000000..7623e941 --- /dev/null +++ b/packages/db/src/buffers/index.ts @@ -0,0 +1,5 @@ +import { EventBuffer } from './event-buffer'; +import { ProfileBuffer } from './profile-buffer'; + +export const eventBuffer = new EventBuffer(); +export const profileBuffer = new ProfileBuffer(); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts new file mode 100644 index 00000000..616b7923 --- /dev/null +++ b/packages/db/src/buffers/profile-buffer.ts @@ -0,0 +1,114 @@ +import { mergeDeepRight } from 'ramda'; + +import { toDots } from '@openpanel/common'; +import { redis } from '@openpanel/redis'; + +import { ch, chQuery } from '../clickhouse-client'; +import type { + IClickhouseProfile, + IServiceProfile, +} from '../services/profile.service'; +import { transformProfile } from '../services/profile.service'; +import type { + Find, + FindMany, + OnCompleted, + OnInsert, + ProcessQueue, + QueueItem, +} from './buffer'; +import { RedisBuffer } from './buffer'; + +export class ProfileBuffer extends RedisBuffer { + constructor() { + super({ + redis, + table: 'profiles', + batchSize: 100, + }); + } + + public onInsert?: OnInsert | undefined; + public onCompleted?: OnCompleted | undefined; + + public processQueue: ProcessQueue = async (queue) => { + const itemsToClickhouse = new Map>(); + + // Combine all writes to the same profile + queue.forEach((item) => { + const key = item.event.project_id + item.event.id; + const existing = itemsToClickhouse.get(key); + itemsToClickhouse.set( + item.event.project_id + item.event.id, + mergeDeepRight(existing ?? {}, item) + ); + }); + + const cleanedQueue = Array.from(itemsToClickhouse.values()); + + const profiles = await chQuery( + `SELECT + * + FROM profiles + WHERE + (id, project_id) IN (${cleanedQueue.map((item) => `('${item.event.id}', '${item.event.project_id}')`).join(',')}) + ORDER BY + created_at DESC` + ); + + await ch.insert({ + table: 'profiles', + values: cleanedQueue.map((item) => { + const profile = profiles.find( + (p) => + p.id === item.event.id && p.project_id === item.event.project_id + ); + + return { + id: item.event.id, + first_name: item.event.first_name ?? profile?.first_name ?? '', + last_name: item.event.last_name ?? profile?.last_name ?? '', + email: item.event.email ?? profile?.email ?? '', + avatar: item.event.avatar ?? profile?.avatar ?? '', + properties: toDots({ + ...(profile?.properties ?? {}), + ...(item.event.properties ?? {}), + }), + project_id: item.event.project_id ?? profile?.project_id ?? '', + created_at: new Date(), + is_external: item.event.is_external, + }; + }), + clickhouse_settings: { + date_time_input_format: 'best_effort', + }, + format: 'JSONEachRow', + }); + return queue.map((item) => item.index); + }; + + public findMany: FindMany = async ( + callback + ) => { + return this.getQueue(-1) + .then((queue) => { + return queue + .filter(callback) + .map((item) => transformProfile(item.event)); + }) + .catch(() => { + return []; + }); + }; + + public find: Find = async (callback) => { + return this.getQueue(-1) + .then((queue) => { + const match = queue.find(callback); + return match ? transformProfile(match.event) : null; + }) + .catch(() => { + return null; + }); + }; +} diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 02c1ecba..c9615674 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -1,7 +1,7 @@ import type { ResponseJSON } from '@clickhouse/client'; import { createClient } from '@clickhouse/client'; -export const ch = createClient({ +export const originalCh = createClient({ url: process.env.CLICKHOUSE_URL, username: process.env.CLICKHOUSE_USER, password: process.env.CLICKHOUSE_PASSWORD, @@ -9,6 +9,53 @@ export const ch = createClient({ max_open_connections: 10, keep_alive: { enabled: true, + idle_socket_ttl: 5000, + }, + compression: { + request: true, + }, +}); + +export const ch = new Proxy(originalCh, { + get(target, property, receiver) { + if (property === 'insert' || property === 'query') { + return async (...args: any[]) => { + try { + // First attempt + if (property in target) { + // @ts-expect-error + return await target[property](...args); + } + } catch (error: unknown) { + if ( + error instanceof Error && + error.message.includes('socket hang up') + ) { + console.error( + `Caught socket hang up error on ${property.toString()}, retrying once.` + ); + await new Promise((resolve) => setTimeout(resolve, 500)); + try { + // Retry once + if (property in target) { + // @ts-expect-error + return await target[property](...args); + } + } catch (retryError) { + console.error( + `Retry failed for ${property.toString()}:`, + retryError + ); + throw retryError; // Rethrow or handle as needed + } + } else { + // Handle other errors or rethrow them + throw error; + } + } + }; + } + return Reflect.get(target, property, receiver); }, }); diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 4e589311..e8e3c0f8 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -1,12 +1,12 @@ import { omit, uniq } from 'ramda'; import { escape } from 'sqlstring'; -import superjson from 'superjson'; import { v4 as uuid } from 'uuid'; import { toDots } from '@openpanel/common'; -import { redis, redisPub } from '@openpanel/redis'; +import { redis } from '@openpanel/redis'; import type { IChartEventFilter } from '@openpanel/validation'; +import { eventBuffer } from '../buffers'; import { ch, chQuery, @@ -17,7 +17,7 @@ import type { EventMeta, Prisma } from '../prisma-client'; import { db } from '../prisma-client'; import { createSqlBuilder } from '../sql-builder'; import { getEventFiltersWhereClause } from './chart.service'; -import { getProfileById, getProfiles, upsertProfile } from './profile.service'; +import { getProfiles, upsertProfile } from './profile.service'; import type { IServiceProfile } from './profile.service'; export interface IClickhouseEvent { @@ -226,17 +226,14 @@ export async function createEvent( payload.profileId = payload.deviceId; } console.log( - `create event ${payload.name} for deviceId: ${payload.deviceId} profileId ${payload.profileId}` + `create event ${payload.name} for [deviceId]: ${payload.deviceId} [profileId]: ${payload.profileId} [projectId]: ${payload.projectId} [path]: ${payload.path}` ); - const exists = await getProfileById(payload.profileId, payload.projectId); - if (!exists && payload.profileId !== '') { + if (payload.profileId !== '') { await upsertProfile({ id: String(payload.profileId), - isExternal: false, + isExternal: payload.profileId !== payload.deviceId, projectId: payload.projectId, - firstName: '', - lastName: '', properties: { path: payload.path, country: payload.country, @@ -287,25 +284,9 @@ export async function createEvent( referrer_type: payload.referrerType ?? '', }; - const res = await ch.insert({ - table: 'events', - values: [event], - format: 'JSONEachRow', - clickhouse_settings: { - date_time_input_format: 'best_effort', - }, - }); - - redisPub.publish('event', superjson.stringify(transformEvent(event))); - redis.set( - `live:event:${event.project_id}:${event.profile_id}`, - '', - 'EX', - 60 * 5 - ); + await eventBuffer.insert(event); return { - ...res, document: event, }; } @@ -449,3 +430,27 @@ export function getConversionEventNames(projectId: string) { }, }); } + +export async function getLastScreenViewFromProfileId({ + profileId, + projectId, +}: { + profileId: string; + projectId: string; +}) { + const eventInBuffer = await eventBuffer.find( + (item) => item.event.profile_id === profileId + ); + + if (eventInBuffer) { + return eventInBuffer; + } + + const [eventInDb] = profileId + ? await getEvents( + `SELECT * FROM events WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} AND created_at >= now() - INTERVAL 30 MINUTE ORDER BY created_at DESC LIMIT 1` + ) + : []; + + return eventInDb || null; +} diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index b457a398..fa7cbaae 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -1,9 +1,10 @@ import { escape } from 'sqlstring'; -import { toDots, toObject } from '@openpanel/common'; +import { toObject } from '@openpanel/common'; import type { IChartEventFilter } from '@openpanel/validation'; -import { ch, chQuery } from '../clickhouse-client'; +import { profileBuffer } from '../buffers'; +import { chQuery, formatClickhouseDate } from '../clickhouse-client'; import { createSqlBuilder } from '../sql-builder'; export type IProfileMetrics = { @@ -66,7 +67,10 @@ export async function getProfiles(ids: string[]) { const data = await chQuery( `SELECT * FROM profiles FINAL - WHERE id IN (${ids.map((id) => escape(id)).join(',')}) + WHERE id IN (${ids + .map((id) => escape(id)) + .filter(Boolean) + .join(',')}) ` ); @@ -172,31 +176,15 @@ export async function upsertProfile({ projectId, isExternal, }: IServiceUpsertProfile) { - const [profile] = await chQuery( - `SELECT * FROM profiles WHERE id = ${escape(id)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` - ); - - await ch.insert({ - table: 'profiles', - format: 'JSONEachRow', - clickhouse_settings: { - date_time_input_format: 'best_effort', - }, - values: [ - { - id, - first_name: firstName ?? profile?.first_name ?? '', - last_name: lastName ?? profile?.last_name ?? '', - email: email ?? profile?.email ?? '', - avatar: avatar ?? profile?.avatar ?? '', - properties: toDots({ - ...(profile?.properties ?? {}), - ...(properties ?? {}), - }), - project_id: projectId ?? profile?.project_id ?? '', - created_at: new Date(), - is_external: isExternal, - }, - ], + return profileBuffer.insert({ + id, + first_name: firstName!, + last_name: lastName!, + email: email!, + avatar: avatar!, + properties: properties as Record, + project_id: projectId, + created_at: formatClickhouseDate(new Date()), + is_external: isExternal, }); } diff --git a/packages/logger/index.ts b/packages/logger/index.ts index 50645257..3c081fbf 100644 --- a/packages/logger/index.ts +++ b/packages/logger/index.ts @@ -3,7 +3,7 @@ import pino from 'pino'; export function createLogger({ dataset }: { dataset: string }) { const targets: TransportTargetOptions[] = - process.env.NODE_ENV === 'production' + process.env.NODE_ENV === 'production' && process.env.BASELIME_API_KEY ? [ { target: '@baselime/pino-transport', diff --git a/packages/queue/package.json b/packages/queue/package.json index fe30a30a..ec7f1907 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -9,7 +9,7 @@ }, "dependencies": { "@openpanel/db": "workspace:*", - "bullmq": "^5.1.1" + "bullmq": "^5.8.7" }, "devDependencies": { "@openpanel/sdk": "workspace:*", @@ -28,4 +28,4 @@ ] }, "prettier": "@openpanel/prettier-config" -} +} \ No newline at end of file diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index b8b81e88..bdc1c8b3 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -22,8 +22,7 @@ export interface EventsQueuePayloadIncomingEvent { }; currentDeviceId: string; previousDeviceId: string; - currentDeviceIdDeprecated: string; - previousDeviceIdDeprecated: string; + priority: boolean; }; } export interface EventsQueuePayloadCreateEvent { @@ -32,17 +31,36 @@ export interface EventsQueuePayloadCreateEvent { } export interface EventsQueuePayloadCreateSessionEnd { type: 'createSessionEnd'; - payload: Pick; + payload: Pick< + IServiceCreateEventPayload, + 'deviceId' | 'sessionId' | 'profileId' + >; } + +// TODO: Rename `EventsQueuePayloadCreateSessionEnd` +export type SessionsQueuePayload = EventsQueuePayloadCreateSessionEnd; + export type EventsQueuePayload = | EventsQueuePayloadCreateEvent | EventsQueuePayloadCreateSessionEnd | EventsQueuePayloadIncomingEvent; -export interface CronQueuePayload { +export type CronQueuePayloadSalt = { type: 'salt'; payload: undefined; -} +}; +export type CronQueuePayloadFlushEvents = { + type: 'flushEvents'; + payload: undefined; +}; +export type CronQueuePayloadFlushProfiles = { + type: 'flushProfiles'; + payload: undefined; +}; +export type CronQueuePayload = + | CronQueuePayloadSalt + | CronQueuePayloadFlushEvents + | CronQueuePayloadFlushProfiles; export const eventsQueue = new Queue('events', { connection, @@ -51,6 +69,13 @@ export const eventsQueue = new Queue('events', { }, }); +export const sessionsQueue = new Queue('sessions', { + connection, + defaultJobOptions: { + removeOnComplete: 10, + }, +}); + export const cronQueue = new Queue('cron', { connection, defaultJobOptions: { diff --git a/packages/queue/src/utils.ts b/packages/queue/src/utils.ts index c9e0b23f..83cb2053 100644 --- a/packages/queue/src/utils.ts +++ b/packages/queue/src/utils.ts @@ -24,7 +24,7 @@ export async function findJobByPrefix( async function getJob(index: number) { if (index >= filtered.length) return null; - const key = filtered[index]?.replace(/^bull:events:/, ''); + const key = filtered[index]?.replace(/^bull:(\w+):/, ''); // return new Promise((resolve) => ) if (key) { const job = await queue.getJob(key); diff --git a/packages/redis/cachable.ts b/packages/redis/cachable.ts index a0631796..39fe830d 100644 --- a/packages/redis/cachable.ts +++ b/packages/redis/cachable.ts @@ -4,14 +4,25 @@ export function cacheable any>( fn: T, expire: number ) { - return async function (...args: Parameters): Promise> { + return async function ( + ...args: Parameters + ): Promise>> { + // JSON.stringify here is not bullet proof since ordering of object keys matters etc const key = `cachable:${fn.name}:${JSON.stringify(args)}`; const cached = await redis.get(key); if (cached) { - return JSON.parse(cached); + try { + return JSON.parse(cached); + } catch (e) { + console.error('Failed to parse cache', e); + } } const result = await fn(...(args as any)); - redis.setex(key, expire, JSON.stringify(result)); + + if (result !== undefined || result !== null) { + redis.setex(key, expire, JSON.stringify(result)); + } + return result; }; } diff --git a/packages/redis/package.json b/packages/redis/package.json index f2245aa7..391da1bb 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -9,7 +9,7 @@ "with-env": "dotenv -e ../../.env -c --" }, "dependencies": { - "ioredis": "^5.3.2" + "ioredis": "^5.4.1" }, "devDependencies": { "@openpanel/eslint-config": "workspace:*", @@ -28,4 +28,4 @@ ] }, "prettier": "@openpanel/prettier-config" -} +} \ No newline at end of file diff --git a/packages/redis/redis.ts b/packages/redis/redis.ts index 589a6889..cf3e59ec 100644 --- a/packages/redis/redis.ts +++ b/packages/redis/redis.ts @@ -2,9 +2,12 @@ import type { RedisOptions } from 'ioredis'; import Redis from 'ioredis'; const options: RedisOptions = { - connectTimeout: 10000, + connectTimeout: 30000, + maxRetriesPerRequest: null, }; +export { Redis }; + export const redis = new Redis(process.env.REDIS_URL!, options); export const redisSub = new Redis(process.env.REDIS_URL!, options); export const redisPub = new Redis(process.env.REDIS_URL!, options); diff --git a/patches/@bull-board__api@5.21.0.patch b/patches/@bull-board__api@5.21.0.patch new file mode 100644 index 00000000..88c20ca6 --- /dev/null +++ b/patches/@bull-board__api@5.21.0.patch @@ -0,0 +1,40 @@ +diff --git a/dist/src/queueAdapters/bullMQ.js b/dist/src/queueAdapters/bullMQ.js +index 3e5a2e61c9600459678fda0e29397faee7db9a1e..6cf94ac4384d86d99f292c8f296d4b95ab8f2d19 100644 +--- a/dist/src/queueAdapters/bullMQ.js ++++ b/dist/src/queueAdapters/bullMQ.js +@@ -21,11 +21,31 @@ class BullMQAdapter extends base_1.BaseAdapter { + addJob(name, data, options) { + return this.queue.add(name, data, options); + } +- getJob(id) { +- return this.queue.getJob(id); ++ getDelayedScoreName() { ++ return `${this.queue.opts.prefix}:${this.queue.name}:delayed`; ++ } ++ transformJob(job, actualDelay) { ++ job.opts.delay = actualDelay ? actualDelay - job.timestamp : job.delay ++ return job ++ } ++ async getJob(id) { ++ const client = await this.queue.client ++ const job = await this.queue.getJob(id); ++ if(job) { ++ const score = await client.zscore(this.getDelayedScoreName(), id) ++ return this.transformJob(job, score ? score / 0x1000 : null) ++ } ++ return undefined + } +- getJobs(jobStatuses, start, end) { +- return this.queue.getJobs(jobStatuses, start, end); ++ async getJobs(jobStatuses, start, end) { ++ const jobs = await this.queue.getJobs(jobStatuses, start, end); ++ if(jobs.length === 0) { ++ return [] ++ } ++ const client = await this.queue.client ++ const scores = await client.zmscore(this.getDelayedScoreName(), jobs.map(job => job.id)) ++ const delays = jobs.map((_, i) => scores[i] !== null ? scores[i] / 0x1000 : null) ++ return jobs.map((job, i) => this.transformJob(job, delays[i])) + } + getJobCounts() { + return this.queue.getJobCounts(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 215c57fd..6533d71d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4,6 +4,11 @@ settings: autoInstallPeers: true excludeLinksFromLockfile: false +patchedDependencies: + '@bull-board/api@5.21.0': + hash: 25udjn3ygs6h4rrgl46tnrqrn4 + path: patches/@bull-board__api@5.21.0.patch + importers: .: @@ -98,6 +103,9 @@ importers: superjson: specifier: ^1.13.3 version: 1.13.3 + svix: + specifier: ^1.24.0 + version: 1.24.0 ua-parser-js: specifier: ^1.0.37 version: 1.0.37 @@ -163,8 +171,8 @@ importers: specifier: ^5.0.12 version: 5.0.12(next@14.2.1)(react-dom@18.2.0)(react@18.2.0) '@clickhouse/client': - specifier: ^0.2.9 - version: 0.2.9 + specifier: ^1.2.0 + version: 1.2.0 '@hookform/resolvers': specifier: ^3.3.4 version: 3.3.4(react-hook-form@7.50.1) @@ -189,6 +197,9 @@ importers: '@openpanel/validation': specifier: workspace:^ version: link:../../packages/validation + '@prisma/nextjs-monorepo-workaround-plugin': + specifier: ^5.12.1 + version: 5.16.1 '@radix-ui/react-accordion': specifier: ^1.1.2 version: 1.1.2(@types/react-dom@18.2.19)(@types/react@18.2.56)(react-dom@18.2.0)(react@18.2.0) @@ -448,9 +459,6 @@ importers: '@openpanel/tsconfig': specifier: workspace:* version: link:../../tooling/typescript - '@prisma/nextjs-monorepo-workaround-plugin': - specifier: ^5.12.1 - version: 5.12.1 '@types/bcrypt': specifier: ^5.0.2 version: 5.0.2 @@ -721,11 +729,11 @@ importers: specifier: ^0.1.5 version: 0.1.5 '@bull-board/api': - specifier: ^5.13.0 - version: 5.14.1(@bull-board/ui@5.14.1) + specifier: ^5.21.0 + version: 5.21.0(patch_hash=25udjn3ygs6h4rrgl46tnrqrn4)(@bull-board/ui@5.21.0) '@bull-board/express': - specifier: ^5.13.0 - version: 5.14.1 + specifier: ^5.21.0 + version: 5.21.0 '@openpanel/common': specifier: workspace:* version: link:../../packages/common @@ -742,14 +750,17 @@ importers: specifier: workspace:* version: link:../../packages/redis bullmq: - specifier: ^5.1.1 - version: 5.2.0 + specifier: ^5.8.7 + version: 5.8.7 express: specifier: ^4.18.2 version: 4.18.2 pino: specifier: ^8.17.2 version: 8.19.0 + pino-pretty: + specifier: ^10.3.1 + version: 10.3.1 ramda: specifier: ^0.29.1 version: 0.29.1 @@ -888,8 +899,8 @@ importers: specifier: ^5.0.2 version: 5.0.2(eslint@8.56.0)(next@14.2.1)(react-dom@18.2.0)(react@18.2.0)(typescript@5.3.3) '@clickhouse/client': - specifier: ^1.0.1 - version: 1.0.1 + specifier: ^1.2.0 + version: 1.2.0 '@openpanel/common': specifier: workspace:* version: link:../common @@ -995,8 +1006,8 @@ importers: specifier: workspace:* version: link:../db bullmq: - specifier: ^5.1.1 - version: 5.2.0 + specifier: ^5.8.7 + version: 5.8.7 devDependencies: '@openpanel/eslint-config': specifier: workspace:* @@ -1026,8 +1037,8 @@ importers: packages/redis: dependencies: ioredis: - specifier: ^5.3.2 - version: 5.3.2 + specifier: ^5.4.1 + version: 5.4.1 devDependencies: '@openpanel/eslint-config': specifier: workspace:* @@ -3010,30 +3021,31 @@ packages: resolution: {integrity: sha512-s3jaWicZd0pkP0jf5ysyHUI/RE7MHos6qlToFcGWXVp+ykHOy77OUMrfbgJ9it2C5bow7OIQwYYaHjk9XlBQ2A==} dev: false - /@bull-board/api@5.14.1(@bull-board/ui@5.14.1): - resolution: {integrity: sha512-iDxzpjdFvNj1CRtcKSNreQ0iFO2+Sv/vTSsA8Eq/cu+6BUr7L/T6cyoHFHzPwc1Lauc4v3DJxTxzy20ZC06wgA==} + /@bull-board/api@5.21.0(patch_hash=25udjn3ygs6h4rrgl46tnrqrn4)(@bull-board/ui@5.21.0): + resolution: {integrity: sha512-27tjptwgRgP1G5jT+POjiZZOP3LgdIM4XdfEWfa6t5E0CYImL4EjmdiFo5lhbHhYKZ842VhIpHuNcPk8nY3K9A==} peerDependencies: - '@bull-board/ui': 5.14.1 + '@bull-board/ui': 5.21.0 dependencies: - '@bull-board/ui': 5.14.1 + '@bull-board/ui': 5.21.0 redis-info: 3.1.0 dev: false + patched: true - /@bull-board/express@5.14.1: - resolution: {integrity: sha512-UfOxbKrcTWWosOO/5dpOJibtZCEFS/szbJGQl0Wj2lKXaVX3asUovmm92uGhGlmG/kA6kYR/aerrCpnB0GrKPQ==} + /@bull-board/express@5.21.0: + resolution: {integrity: sha512-iBPBJq8KYebYrN4YvdSvEfOxjYYJfWycilAfNDSikyI3rJKOBRq34BmDnQj6Jn1ytssBb+vvZ35+bCSbbhFB3w==} dependencies: - '@bull-board/api': 5.14.1(@bull-board/ui@5.14.1) - '@bull-board/ui': 5.14.1 - ejs: 3.1.9 - express: 4.18.2 + '@bull-board/api': 5.21.0(patch_hash=25udjn3ygs6h4rrgl46tnrqrn4)(@bull-board/ui@5.21.0) + '@bull-board/ui': 5.21.0 + ejs: 3.1.10 + express: 4.19.2 transitivePeerDependencies: - supports-color dev: false - /@bull-board/ui@5.14.1: - resolution: {integrity: sha512-4hdZBN5+DgGZDhy05ngrsXkPJxDYeS9JW0yyeIgS9g5nmGYQyG3l1TPPF7J8+L7HT/lwsY62EyMLSV1PoBE8iQ==} + /@bull-board/ui@5.21.0: + resolution: {integrity: sha512-eH8QQwIHgCXxNEmlg9EZr3fSvno/bdbgBGfSQO5s9c9n9eDEaKX46ambKSPvgFPtwSdiV1AYQEa/3fGSebVIxg==} dependencies: - '@bull-board/api': 5.14.1(@bull-board/ui@5.14.1) + '@bull-board/api': 5.21.0(patch_hash=25udjn3ygs6h4rrgl46tnrqrn4)(@bull-board/ui@5.21.0) dev: false /@canvas/image-data@1.0.0: @@ -3237,26 +3249,15 @@ packages: csstype: 3.1.1 dev: false - /@clickhouse/client-common@0.2.9: - resolution: {integrity: sha512-ecXcegMbT4HYNWtGcfyidW6lNVRqPogbFMY5kfjJmz4IXJ4WZbQMwj2IQgemwFwE7jyia2OEwPIVfw1sNfDHRA==} + /@clickhouse/client-common@1.2.0: + resolution: {integrity: sha512-VfA/C/tVJ2eNe72CaQ7eXmai+yqFEvZjQZiNtvJoOMLP+Vtb6DzqH9nfkgsiHHMhUhhclvt2mFh6+euk1Ea5wA==} dev: false - /@clickhouse/client-common@1.0.1: - resolution: {integrity: sha512-3L6e0foP6VOktScoi6XWMjJyOpKCWgLUYgPVxP2c7gm6Kotq+iRmmmXtXTSg7B7uozcLZycTtPfIw2d80SYsYw==} - dev: false - - /@clickhouse/client@0.2.9: - resolution: {integrity: sha512-KqQlO9vZNSLyhMWG9+0/VXqcUZrNk1Hybr9icgI/nLCoX8RD19BJsakZJj38IQvQxNUTxvcItm/kyu/gD/9LXA==} + /@clickhouse/client@1.2.0: + resolution: {integrity: sha512-zMp2EhMfp1IrFKr/NjDwNiLsf7nq68nW8lGKszwFe7Iglc6Z5PY9ZA9Hd0XqAk75Q1NmFrkGCP1r3JCM1Nm1Bw==} engines: {node: '>=16'} dependencies: - '@clickhouse/client-common': 0.2.9 - dev: false - - /@clickhouse/client@1.0.1: - resolution: {integrity: sha512-fluUNnE2R7COJ6rn6DorYfi4D+AQn3t2qeBtEq37bQV3pD4EbKrBfKAwJ13e1lmMWdQ2B9bJUTMqGsRIDdWhJw==} - engines: {node: '>=16'} - dependencies: - '@clickhouse/client-common': 1.0.1 + '@clickhouse/client-common': 1.2.0 dev: false /@emnapi/runtime@0.45.0: @@ -5240,9 +5241,9 @@ packages: dependencies: '@prisma/debug': 5.9.1 - /@prisma/nextjs-monorepo-workaround-plugin@5.12.1: - resolution: {integrity: sha512-ISYeAIY2x8riGhpgDqlupqqkqgJe2ksQm4yVH01afQ/7hp7fjPLcEXbDjEYuuloXJgnbvkcN6GXcD3v5hiq85A==} - dev: true + /@prisma/nextjs-monorepo-workaround-plugin@5.16.1: + resolution: {integrity: sha512-uX2nFUt1+qDU6za4lU6vFu6Ww2dD6XnQ3TLu3Eo4eVKhpa8h6qLbYiA5F9Y0yCOxutw12hA/eFU9TEOAa9Zgfg==} + dev: false /@protobufjs/aspromise@1.1.2: resolution: {integrity: sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==} @@ -7040,6 +7041,10 @@ packages: '@sinonjs/commons': 3.0.1 dev: false + /@stablelib/base64@1.0.1: + resolution: {integrity: sha512-1bnPQqSxSuc3Ii6MhBysoWCg58j97aUjuCSZrGSmDxNqtytIi0k8utUenAwTZN4V5mXXYGsVUI9zeBqy+jBOSQ==} + dev: false + /@swc/counter@0.1.3: resolution: {integrity: sha512-e2BR4lsJkkRlKZ/qCHPw9ZaSxc0MVUd7gtbtaB7aMvHeJVYe8sOB8DBZkP2DtISHGSku9sCK6T6cnY0CtXrOCQ==} dev: false @@ -8577,6 +8582,26 @@ packages: - supports-color dev: false + /body-parser@1.20.2: + resolution: {integrity: sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA==} + engines: {node: '>= 0.8', npm: 1.2.8000 || >= 1.4.16} + dependencies: + bytes: 3.1.2 + content-type: 1.0.5 + debug: 2.6.9 + depd: 2.0.0 + destroy: 1.2.0 + http-errors: 2.0.0 + iconv-lite: 0.4.24 + on-finished: 2.4.1 + qs: 6.11.0 + raw-body: 2.5.2 + type-is: 1.6.18 + unpipe: 1.0.0 + transitivePeerDependencies: + - supports-color + dev: false + /boolbase@1.0.0: resolution: {integrity: sha512-JZOSA7Mo9sNGB8+UjSgzdLtokWAky1zbztM3WRLCbZ70/3cTANmQmOdR7y2g+J0e2WXywy1yS468tY+IruqEww==} dev: false @@ -8675,13 +8700,11 @@ packages: resolution: {integrity: sha512-uYBjakWipfaO/bXI7E8rq6kpwHRZK5cNYrUv2OzZSI/FvmdMyXJ2tG9dKcjEC5YHmHpUAwsargWIZNWdxb/bnQ==} dev: false - /bullmq@5.2.0: - resolution: {integrity: sha512-h0lkO9dNBLuFRdjPZXyb9sq0gyaaLir5tY9+uE7qjZaMtDXKUOhZlI7KgVWFT9smotsKG204Mwa7Y7jHxPCctQ==} + /bullmq@5.8.7: + resolution: {integrity: sha512-IdAgB9WvJHRAcZtamRLj6fbjMyuIogEa1cjOTWM1pkVoHUOpO34q6FzNMX1R8VOeUhkvkOkWcxI5ENgFLh+TVA==} dependencies: cron-parser: 4.9.0 - glob: 8.1.0 - ioredis: 5.3.2 - lodash: 4.17.21 + ioredis: 5.4.1 msgpackr: 1.10.1 node-abort-controller: 3.1.1 semver: 7.6.0 @@ -9229,6 +9252,11 @@ packages: engines: {node: '>= 0.6'} dev: false + /cookie@0.6.0: + resolution: {integrity: sha512-U71cyTamuh1CRNCfpGY6to28lxvNwPG4Guz/EVjgf3Jmzv0vlDp1atT9eS5dDjMYHucpHbWns6Lwf3BKz6svdw==} + engines: {node: '>= 0.6'} + dev: false + /cookies@0.8.0: resolution: {integrity: sha512-8aPsApQfebXnuI+537McwYsDtjVxGm8gTIzQI3FDW6t5t/DAhERxtnbEPN/8RX+uZthoz4eCOgloXaE5cYyNow==} engines: {node: '>= 0.8'} @@ -10057,8 +10085,8 @@ packages: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} dev: false - /ejs@3.1.9: - resolution: {integrity: sha512-rC+QVNMJWv+MtPgkt0y+0rVEIdbtxVADApW9JXrUVlzHetgcyczP/E7DJmWJ4fJCZF2cPcBk0laWO9ZHMG3DmQ==} + /ejs@3.1.10: + resolution: {integrity: sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==} engines: {node: '>=0.10.0'} hasBin: true dependencies: @@ -10280,6 +10308,10 @@ packages: is-symbol: 1.0.4 dev: false + /es6-promise@4.2.8: + resolution: {integrity: sha512-HJDGx5daxeIvxdBxvG2cb9g4tEvwIk3i8+nhX0yGrYmZUzbkdg8QbDevheDB8gd0//uPj4c1EQua8Q+MViT0/w==} + dev: false + /esbuild@0.19.12: resolution: {integrity: sha512-aARqgq8roFBj054KvQr5f1sFu0D65G+miZRCuJyJ0G13Zwx7vRar5Zhn2tkQNzIXcBrNVsv/8stehpj+GAjgbg==} engines: {node: '>=12'} @@ -11031,6 +11063,45 @@ packages: - supports-color dev: false + /express@4.19.2: + resolution: {integrity: sha512-5T6nhjsT+EOMzuck8JjBHARTHfMht0POzlA60WV2pMD3gyXw2LZnZ+ueGdNxG+0calOJcWKbpFcuzLZ91YWq9Q==} + engines: {node: '>= 0.10.0'} + dependencies: + accepts: 1.3.8 + array-flatten: 1.1.1 + body-parser: 1.20.2 + content-disposition: 0.5.4 + content-type: 1.0.5 + cookie: 0.6.0 + cookie-signature: 1.0.6 + debug: 2.6.9 + depd: 2.0.0 + encodeurl: 1.0.2 + escape-html: 1.0.3 + etag: 1.8.1 + finalhandler: 1.2.0 + fresh: 0.5.2 + http-errors: 2.0.0 + merge-descriptors: 1.0.1 + methods: 1.1.2 + on-finished: 2.4.1 + parseurl: 1.3.3 + path-to-regexp: 0.1.7 + proxy-addr: 2.0.7 + qs: 6.11.0 + range-parser: 1.2.1 + safe-buffer: 5.2.1 + send: 0.18.0 + serve-static: 1.15.0 + setprototypeof: 1.2.0 + statuses: 2.0.1 + type-is: 1.6.18 + utils-merge: 1.0.1 + vary: 1.1.2 + transitivePeerDependencies: + - supports-color + dev: false + /extend-shallow@2.0.1: resolution: {integrity: sha512-zCnTtlxNoAiDc3gqY2aYAWFx7XWWiasuF2K8Me5WbN8otHKTUKBwjPtNpRs/rbUZm7KxWAaNj7P1a/p52GbVug==} engines: {node: '>=0.10.0'} @@ -11105,6 +11176,10 @@ packages: resolution: {integrity: sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA==} dev: false + /fast-sha256@1.3.0: + resolution: {integrity: sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==} + dev: false + /fast-uri@2.3.0: resolution: {integrity: sha512-eel5UKGn369gGEWOqBShmFJWfq/xSJvsgDzgLYC845GneayWvXBf0lJCBn5qTABfewy1ZDPoaR5OZCP+kssfuw==} dev: false @@ -11672,7 +11747,6 @@ packages: /glob@7.1.7: resolution: {integrity: sha512-OvD9ENzPLbegENnYP5UUfJIirTg4+XwMWGaQfQTY0JenxNvvIKP3U3/tAQSPIu/lHxXYSZmpXlUHeqAIdKzBLQ==} - deprecated: Glob versions prior to v9 are no longer supported dependencies: fs.realpath: 1.0.0 inflight: 1.0.6 @@ -11692,17 +11766,6 @@ packages: once: 1.4.0 path-is-absolute: 1.0.1 - /glob@8.1.0: - resolution: {integrity: sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==} - engines: {node: '>=12'} - dependencies: - fs.realpath: 1.0.0 - inflight: 1.0.6 - inherits: 2.0.4 - minimatch: 5.1.6 - once: 1.4.0 - dev: false - /globals@11.12.0: resolution: {integrity: sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==} engines: {node: '>=4'} @@ -12215,8 +12278,8 @@ packages: loose-envify: 1.4.0 dev: false - /ioredis@5.3.2: - resolution: {integrity: sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==} + /ioredis@5.4.1: + resolution: {integrity: sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==} engines: {node: '>=12.22.0'} dependencies: '@ioredis/commands': 1.2.0 @@ -15729,6 +15792,10 @@ packages: side-channel: 1.0.5 dev: false + /querystringify@2.2.0: + resolution: {integrity: sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==} + dev: false + /queue-microtask@1.2.3: resolution: {integrity: sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==} @@ -15765,6 +15832,16 @@ packages: unpipe: 1.0.0 dev: false + /raw-body@2.5.2: + resolution: {integrity: sha512-8zGqypfENjCIqGhgXToC8aB2r7YrBX+AQAfIPs/Mlk+BtPTztOvTS01NRW/3Eh60J+a48lt8qsCzirQ6loCVfA==} + engines: {node: '>= 0.8'} + dependencies: + bytes: 3.1.2 + http-errors: 2.0.0 + iconv-lite: 0.4.24 + unpipe: 1.0.0 + dev: false + /rc-resize-observer@1.4.0(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-PnMVyRid9JLxFavTjeDXEXo65HCRqbmLBw9xX9gfC4BZiSzbLXKzW3jPz+J0P71pLbD5tBMTT+mkstV5gD0c9Q==} peerDependencies: @@ -16546,6 +16623,10 @@ packages: resolve: 1.7.1 dev: false + /requires-port@1.0.0: + resolution: {integrity: sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==} + dev: false + /reselect@4.1.8: resolution: {integrity: sha512-ab9EmR80F/zQTMNeneUr4cv+jSwPJgIlvEmVwLerwrWVbpLlBuls9XHzIeTFy4cegU2NHBp3va0LKOzU5qFEYQ==} dev: false @@ -17431,6 +17512,27 @@ packages: resolution: {integrity: sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w==} engines: {node: '>= 0.4'} + /svix-fetch@3.0.0: + resolution: {integrity: sha512-rcADxEFhSqHbraZIsjyZNh4TF6V+koloX1OzZ+AQuObX9mZ2LIMhm1buZeuc5BIZPftZpJCMBsSiBaeszo9tRw==} + dependencies: + node-fetch: 2.7.0 + whatwg-fetch: 3.6.20 + transitivePeerDependencies: + - encoding + dev: false + + /svix@1.24.0: + resolution: {integrity: sha512-TEznBskvdvEJElo/j7BiIZAoaQEWyj/NCmwiV0izlVRf5DnCBFdowkEXERDA3JgUlAYoAJi0S7atWit7nkTMtw==} + dependencies: + '@stablelib/base64': 1.0.1 + es6-promise: 4.2.8 + fast-sha256: 1.3.0 + svix-fetch: 3.0.0 + url-parse: 1.5.10 + transitivePeerDependencies: + - encoding + dev: false + /swr@2.2.0(react@18.2.0): resolution: {integrity: sha512-AjqHOv2lAhkuUdIiBu9xbuettzAzWXmCEcLONNKJRba87WAefz8Ca9d6ds/SzrPc235n1IxWYdhJ2zF3MNUaoQ==} peerDependencies: @@ -18144,6 +18246,13 @@ packages: node-fetch: 3.3.2 dev: false + /url-parse@1.5.10: + resolution: {integrity: sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==} + dependencies: + querystringify: 2.2.0 + requires-port: 1.0.0 + dev: false + /use-callback-ref@1.3.1(@types/react@18.2.56)(react@18.2.0): resolution: {integrity: sha512-Lg4Vx1XZQauB42Hw3kK7JM6yjVjgFmFC5/Ab797s79aARomD2nEErc4mCgM8EZrARLmmbWpi5DGCadmK50DcAQ==} engines: {node: '>=10'}