diff --git a/apps/api/package.json b/apps/api/package.json index 3cb8b99f..5f63bc5d 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -30,7 +30,6 @@ "@openpanel/logger": "workspace:*", "@openpanel/payments": "workspace:*", "@openpanel/queue": "workspace:*", - "groupmq": "catalog:", "@openpanel/redis": "workspace:*", "@openpanel/trpc": "workspace:*", "@openpanel/validation": "workspace:*", @@ -40,6 +39,7 @@ "fastify": "^5.6.1", "fastify-metrics": "^12.1.0", "fastify-raw-body": "^5.0.0", + "groupmq": "catalog:", "jsonwebtoken": "^9.0.2", "ramda": "^0.29.1", "sharp": "^0.33.5", diff --git a/apps/api/src/bots/index.ts b/apps/api/src/bots/index.ts index 12d6b738..391aa290 100644 --- a/apps/api/src/bots/index.ts +++ b/apps/api/src/bots/index.ts @@ -1,4 +1,4 @@ -import { cacheable, cacheableLru } from '@openpanel/redis'; +import { cacheable } from '@openpanel/redis'; import bots from './bots'; // Pre-compile regex patterns at module load time @@ -15,7 +15,7 @@ const compiledBots = bots.map((bot) => { const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot); const includesBots = compiledBots.filter((bot) => 'includes' in bot); -export const isBot = cacheableLru( +export const isBot = cacheable( 'is-bot', (ua: string) => { // Check simple string patterns first (fast) @@ -40,8 +40,5 @@ export const isBot = cacheableLru( return null; }, - { - maxSize: 1000, - ttl: 60 * 5, - }, + 60 * 5 ); diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 332968ad..488e6713 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -19,9 +19,14 @@ export function wsVisitors( ) { const { params } = req; const sendCount = () => { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); + eventBuffer + .getActiveVisitorCount(params.projectId) + .then((count) => { + socket.send(String(count)); + }) + .catch(() => { + socket.send('0'); + }); }; const unsubscribe = subscribeToPublishedEvent( diff --git a/apps/api/src/controllers/manage.controller.ts b/apps/api/src/controllers/manage.controller.ts index e1d3bf67..1d162851 100644 --- a/apps/api/src/controllers/manage.controller.ts +++ b/apps/api/src/controllers/manage.controller.ts @@ -1,5 +1,4 @@ import crypto from 'node:crypto'; -import { HttpError } from '@/utils/errors'; import { stripTrailingSlash } from '@openpanel/common'; import { hashPassword } from '@openpanel/common/server'; import { @@ -10,6 +9,7 @@ import { } from '@openpanel/db'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { z } from 'zod'; +import { HttpError } from '@/utils/errors'; // Validation schemas const zCreateProject = z.object({ @@ -57,7 +57,7 @@ const zUpdateReference = z.object({ // Projects CRUD export async function listProjects( request: FastifyRequest, - reply: FastifyReply, + reply: FastifyReply ) { const projects = await db.project.findMany({ where: { @@ -74,7 +74,7 @@ export async function listProjects( export async function getProject( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const project = await db.project.findFirst({ where: { @@ -92,7 +92,7 @@ export async function getProject( export async function createProject( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateProject.safeParse(request.body); @@ -139,12 +139,9 @@ export async function createProject( }, }); - // Clear cache await Promise.all([ getProjectByIdCached.clear(project.id), - project.clients.map((client) => { - getClientByIdCached.clear(client.id); - }), + ...project.clients.map((client) => getClientByIdCached.clear(client.id)), ]); reply.send({ @@ -165,7 +162,7 @@ export async function updateProject( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateProject.safeParse(request.body); @@ -223,12 +220,9 @@ export async function updateProject( data: updateData, }); - // Clear cache await Promise.all([ getProjectByIdCached.clear(project.id), - existing.clients.map((client) => { - getClientByIdCached.clear(client.id); - }), + ...existing.clients.map((client) => getClientByIdCached.clear(client.id)), ]); reply.send({ data: project }); @@ -236,7 +230,7 @@ export async function updateProject( export async function deleteProject( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const project = await db.project.findFirst({ where: { @@ -266,7 +260,7 @@ export async function deleteProject( // Clients CRUD export async function listClients( request: FastifyRequest<{ Querystring: { projectId?: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const where: any = { organizationId: request.client!.organizationId, @@ -300,7 +294,7 @@ export async function listClients( export async function getClient( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const client = await db.client.findFirst({ where: { @@ -318,7 +312,7 @@ export async function getClient( export async function createClient( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateClient.safeParse(request.body); @@ -374,7 +368,7 @@ export async function updateClient( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateClient.safeParse(request.body); @@ -417,7 +411,7 @@ export async function updateClient( export async function deleteClient( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const client = await db.client.findFirst({ where: { @@ -444,7 +438,7 @@ export async function deleteClient( // References CRUD export async function listReferences( request: FastifyRequest<{ Querystring: { projectId?: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const where: any = {}; @@ -488,7 +482,7 @@ export async function listReferences( export async function getReference( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const reference = await db.reference.findUnique({ where: { @@ -516,7 +510,7 @@ export async function getReference( export async function createReference( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateReference.safeParse(request.body); @@ -559,7 +553,7 @@ export async function updateReference( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateReference.safeParse(request.body); @@ -616,7 +610,7 @@ export async function updateReference( export async function deleteReference( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const reference = await db.reference.findUnique({ where: { diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index f7a3c61f..0e530f93 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -7,7 +7,10 @@ import { upsertProfile, } from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; -import { getEventsGroupQueueShard } from '@openpanel/queue'; +import { + type EventsQueuePayloadIncomingEvent, + getEventsGroupQueueShard, +} from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; import { type IDecrementPayload, @@ -112,6 +115,7 @@ interface TrackContext { identity?: IIdentifyPayload; deviceId: string; sessionId: string; + session?: EventsQueuePayloadIncomingEvent['payload']['session']; geo: GeoLocation; } @@ -141,19 +145,21 @@ async function buildContext( validatedBody.payload.profileId = profileId; } + const overrideDeviceId = + validatedBody.type === 'track' && + typeof validatedBody.payload?.properties?.__deviceId === 'string' + ? validatedBody.payload?.properties.__deviceId + : undefined; + // Get geo location (needed for track and identify) const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const { deviceId, sessionId } = await getDeviceId({ + const deviceIdResult = await getDeviceId({ projectId, ip, ua, salts, - overrideDeviceId: - validatedBody.type === 'track' && - typeof validatedBody.payload?.properties?.__deviceId === 'string' - ? validatedBody.payload?.properties.__deviceId - : undefined, + overrideDeviceId, }); return { @@ -166,8 +172,9 @@ async function buildContext( isFromPast: timestamp.isTimestampFromThePast, }, identity, - deviceId, - sessionId, + deviceId: deviceIdResult.deviceId, + sessionId: deviceIdResult.sessionId, + session: deviceIdResult.session, geo, }; } @@ -176,13 +183,14 @@ async function handleTrack( payload: ITrackPayload, context: TrackContext ): Promise { - const { projectId, deviceId, geo, headers, timestamp, sessionId } = context; + const { projectId, deviceId, geo, headers, timestamp, sessionId, session } = + context; const uaInfo = parseUserAgent(headers['user-agent'], payload.properties); const groupId = uaInfo.isServer ? payload.profileId ? `${projectId}:${payload.profileId}` - : `${projectId}:${generateId()}` + : undefined : deviceId; const jobId = [ slug(payload.name), @@ -203,7 +211,7 @@ async function handleTrack( } promises.push( - getEventsGroupQueueShard(groupId).add({ + getEventsGroupQueueShard(groupId || generateId()).add({ orderMs: timestamp.value, data: { projectId, @@ -217,6 +225,7 @@ async function handleTrack( geo, deviceId, sessionId, + session, }, groupId, jobId, diff --git a/apps/api/src/hooks/is-bot.hook.ts b/apps/api/src/hooks/is-bot.hook.ts index c01a231a..09346fbd 100644 --- a/apps/api/src/hooks/is-bot.hook.ts +++ b/apps/api/src/hooks/is-bot.hook.ts @@ -1,20 +1,19 @@ -import { isBot } from '@/bots'; import { createBotEvent } from '@openpanel/db'; import type { DeprecatedPostEventPayload, ITrackHandlerPayload, } from '@openpanel/validation'; - import type { FastifyReply, FastifyRequest } from 'fastify'; +import { isBot } from '@/bots'; export async function isBotHook( req: FastifyRequest<{ Body: ITrackHandlerPayload | DeprecatedPostEventPayload; }>, - reply: FastifyReply, + reply: FastifyReply ) { const bot = req.headers['user-agent'] - ? isBot(req.headers['user-agent']) + ? await isBot(req.headers['user-agent']) : null; if (bot && req.client?.projectId) { @@ -44,6 +43,6 @@ export async function isBotHook( } } - return reply.status(202).send(); + return reply.status(202).send({ bot }); } } diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 2f94a8eb..1bb04c4b 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,6 +1,5 @@ -import { fetchDeviceId, handler } from '@/controllers/track.controller'; import type { FastifyPluginCallback } from 'fastify'; - +import { fetchDeviceId, handler } from '@/controllers/track.controller'; import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; @@ -13,7 +12,7 @@ const trackRouter: FastifyPluginCallback = async (fastify) => { fastify.route({ method: 'POST', url: '/', - handler: handler, + handler, }); fastify.route({ diff --git a/apps/api/src/utils/ids.ts b/apps/api/src/utils/ids.ts index db6b9aa5..f2bd9a45 100644 --- a/apps/api/src/utils/ids.ts +++ b/apps/api/src/utils/ids.ts @@ -1,7 +1,12 @@ import crypto from 'node:crypto'; import { generateDeviceId } from '@openpanel/common/server'; import { getSafeJson } from '@openpanel/json'; +import type { + EventsQueuePayloadCreateSessionEnd, + EventsQueuePayloadIncomingEvent, +} from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; +import { pick } from 'ramda'; export async function getDeviceId({ projectId, @@ -37,14 +42,20 @@ export async function getDeviceId({ ua, }); - return await getDeviceIdFromSession({ + return await getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, }); } -async function getDeviceIdFromSession({ +interface DeviceIdResult { + deviceId: string; + sessionId: string; + session?: EventsQueuePayloadIncomingEvent['payload']['session']; +} + +async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, @@ -52,7 +63,7 @@ async function getDeviceIdFromSession({ projectId: string; currentDeviceId: string; previousDeviceId: string; -}) { +}): Promise { try { const multi = getRedisCache().multi(); multi.hget( @@ -65,21 +76,33 @@ async function getDeviceIdFromSession({ ); const res = await multi.exec(); if (res?.[0]?.[1]) { - const data = getSafeJson<{ payload: { sessionId: string } }>( + const data = getSafeJson( (res?.[0]?.[1] as string) ?? '' ); if (data) { - const sessionId = data.payload.sessionId; - return { deviceId: currentDeviceId, sessionId }; + return { + deviceId: currentDeviceId, + sessionId: data.payload.sessionId, + session: pick( + ['referrer', 'referrerName', 'referrerType'], + data.payload + ), + }; } } if (res?.[1]?.[1]) { - const data = getSafeJson<{ payload: { sessionId: string } }>( + const data = getSafeJson( (res?.[1]?.[1] as string) ?? '' ); if (data) { - const sessionId = data.payload.sessionId; - return { deviceId: previousDeviceId, sessionId }; + return { + deviceId: previousDeviceId, + sessionId: data.payload.sessionId, + session: pick( + ['referrer', 'referrerName', 'referrerType'], + data.payload + ), + }; } } } catch (error) { diff --git a/apps/worker/package.json b/apps/worker/package.json index 60366df0..643afc5a 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -16,11 +16,11 @@ "@openpanel/common": "workspace:*", "@openpanel/db": "workspace:*", "@openpanel/email": "workspace:*", + "@openpanel/importer": "workspace:*", "@openpanel/integrations": "workspace:^", "@openpanel/js-runtime": "workspace:*", "@openpanel/json": "workspace:*", "@openpanel/logger": "workspace:*", - "@openpanel/importer": "workspace:*", "@openpanel/payments": "workspace:*", "@openpanel/queue": "workspace:*", "@openpanel/redis": "workspace:*", diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 5b4285ca..f6395565 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -1,10 +1,9 @@ -import type { Queue, WorkerOptions } from 'bullmq'; -import { Worker } from 'bullmq'; - +import { performance } from 'node:perf_hooks'; +import { setTimeout as sleep } from 'node:timers/promises'; import { + cronQueue, EVENTS_GROUP_QUEUES_SHARDS, type EventsQueuePayloadIncomingEvent, - cronQueue, eventsGroupQueues, gscQueue, importQueue, @@ -15,14 +14,12 @@ import { sessionsQueue, } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; - -import { performance } from 'node:perf_hooks'; -import { setTimeout as sleep } from 'node:timers/promises'; +import type { Queue, WorkerOptions } from 'bullmq'; +import { Worker } from 'bullmq'; import { Worker as GroupWorker } from 'groupmq'; - import { cronJob } from './jobs/cron'; -import { gscJob } from './jobs/gsc'; import { incomingEvent } from './jobs/events.incoming-event'; +import { gscJob } from './jobs/gsc'; import { importJob } from './jobs/import'; import { insightsProjectJob } from './jobs/insights'; import { miscJob } from './jobs/misc'; @@ -95,7 +92,7 @@ function getConcurrencyFor(queueName: string, defaultValue = 1): number { return defaultValue; } -export async function bootWorkers() { +export function bootWorkers() { const enabledQueues = getEnabledQueues(); const workers: (Worker | GroupWorker)[] = []; @@ -119,12 +116,14 @@ export async function bootWorkers() { for (const index of eventQueuesToStart) { const queue = eventsGroupQueues[index]; - if (!queue) continue; + if (!queue) { + continue; + } const queueName = `events_${index}`; const concurrency = getConcurrencyFor( queueName, - Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10), + Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10) ); const worker = new GroupWorker({ @@ -132,7 +131,7 @@ export async function bootWorkers() { concurrency, logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined, blockingTimeoutSec: Number.parseFloat( - process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1', + process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1' ), handler: async (job) => { return await incomingEvent(job.data); @@ -172,7 +171,7 @@ export async function bootWorkers() { const notificationWorker = new Worker( notificationQueue.name, notificationJob, - { ...workerOptions, concurrency }, + { ...workerOptions, concurrency } ); workers.push(notificationWorker); logger.info('Started worker for notification', { concurrency }); @@ -224,7 +223,7 @@ export async function bootWorkers() { if (workers.length === 0) { logger.warn( - 'No workers started. Check ENABLED_QUEUES environment variable.', + 'No workers started. Check ENABLED_QUEUES environment variable.' ); } @@ -254,7 +253,7 @@ export async function bootWorkers() { const elapsed = job.finishedOn - job.processedOn; eventsGroupJobDuration.observe( { name: worker.name, status: 'failed' }, - elapsed, + elapsed ); } logger.error('job failed', { @@ -267,23 +266,6 @@ export async function bootWorkers() { } }); - (worker as Worker).on('completed', (job) => { - if (job) { - if (job.processedOn && job.finishedOn) { - const elapsed = job.finishedOn - job.processedOn; - logger.info('job completed', { - jobId: job.id, - worker: worker.name, - elapsed, - }); - eventsGroupJobDuration.observe( - { name: worker.name, status: 'success' }, - elapsed, - ); - } - } - }); - (worker as Worker).on('ioredis:close', () => { logger.error('worker closed due to ioredis:close', { worker: worker.name, @@ -293,7 +275,7 @@ export async function bootWorkers() { async function exitHandler( eventName: string, - evtOrExitCodeOrError: number | string | Error, + evtOrExitCodeOrError: number | string | Error ) { // Log the actual error details for unhandled rejections/exceptions if (evtOrExitCodeOrError instanceof Error) { @@ -339,7 +321,7 @@ export async function bootWorkers() { process.on(evt, (code) => { exitHandler(evt, code); }); - }, + } ); return workers; diff --git a/apps/worker/src/jobs/cron.salt.ts b/apps/worker/src/jobs/cron.salt.ts index e7575f9f..8b5b02e4 100644 --- a/apps/worker/src/jobs/cron.salt.ts +++ b/apps/worker/src/jobs/cron.salt.ts @@ -33,7 +33,7 @@ async function generateNewSalt() { return created; }); - getSalts.clear(); + await getSalts.clear(); return newSalt; } diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 21b169c0..1f15e5d3 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,9 +1,5 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common'; -import { - getReferrerWithQuery, - parseReferrer, - parseUserAgent, -} from '@openpanel/common/server'; +import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; import { checkNotificationRulesForEvent, @@ -14,10 +10,12 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; -import { getLock } from '@openpanel/redis'; import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; -import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; +import { + createSessionEndJob, + extendSessionEndJob, +} from '@/utils/session-handler'; const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; @@ -93,7 +91,8 @@ export async function incomingEvent( projectId, deviceId, sessionId, - uaInfo: _uaInfo, + uaInfo, + session, } = jobPayload; const properties = body.properties ?? {}; const reqId = headers['request-id'] ?? 'unknown'; @@ -121,16 +120,15 @@ export async function incomingEvent( ? null : parseReferrer(getProperty('__referrer')); const utmReferrer = getReferrerWithQuery(query); - const userAgent = headers['user-agent']; const sdkName = headers['openpanel-sdk-name']; const sdkVersion = headers['openpanel-sdk-version']; - // TODO: Remove both user-agent and parseUserAgent - const uaInfo = _uaInfo ?? parseUserAgent(userAgent, properties); - const baseEvent = { + const baseEvent: IServiceCreateEventPayload = { name: body.name, profileId, projectId, + deviceId, + sessionId, properties: omit(GLOBAL_PROPERTIES, { ...properties, __hash: hash, @@ -149,7 +147,7 @@ export async function incomingEvent( origin, referrer: referrer?.url || '', referrerName: utmReferrer?.name || referrer?.name || referrer?.url, - referrerType: referrer?.type || utmReferrer?.type || '', + referrerType: utmReferrer?.type || referrer?.type || '', os: uaInfo.os, osVersion: uaInfo.osVersion, browser: uaInfo.browser, @@ -161,16 +159,17 @@ export async function incomingEvent( body.name === 'revenue' && '__revenue' in properties ? parseRevenue(properties.__revenue) : undefined, - } as const; + }; // if timestamp is from the past we dont want to create a new session if (uaInfo.isServer || isTimestampFromThePast) { - const session = profileId - ? await sessionBuffer.getExistingSession({ - profileId, - projectId, - }) - : null; + const session = + profileId && !isTimestampFromThePast + ? await sessionBuffer.getExistingSession({ + profileId, + projectId, + }) + : null; const payload = { ...baseEvent, @@ -198,82 +197,48 @@ export async function incomingEvent( return createEventAndNotify(payload as IServiceEvent, logger, projectId); } - const sessionEnd = await getSessionEnd({ - projectId, - deviceId, - profileId, - }); - const activeSession = sessionEnd - ? await sessionBuffer.getExistingSession({ - sessionId: sessionEnd.sessionId, - }) - : null; - const payload: IServiceCreateEventPayload = merge(baseEvent, { - deviceId: sessionEnd?.deviceId ?? deviceId, - sessionId: sessionEnd?.sessionId ?? sessionId, - referrer: sessionEnd?.referrer ?? baseEvent.referrer, - referrerName: sessionEnd?.referrerName ?? baseEvent.referrerName, - referrerType: sessionEnd?.referrerType ?? baseEvent.referrerType, - // if the path is not set, use the last screen view path - path: baseEvent.path || activeSession?.exit_path || '', - origin: baseEvent.origin || activeSession?.exit_origin || '', + referrer: session?.referrer ?? baseEvent.referrer, + referrerName: session?.referrerName ?? baseEvent.referrerName, + referrerType: session?.referrerType ?? baseEvent.referrerType, } as Partial) as IServiceCreateEventPayload; - // If the triggering event is filtered, do not create session_start or the event (issue #2) const isExcluded = await isEventExcludedByProjectFilter(payload, projectId); if (isExcluded) { logger.info( 'Skipping session_start and event (excluded by project filter)', + { event: payload.name, projectId } + ); + return null; + } + + if (session) { + await extendSessionEndJob({ + projectId, + deviceId, + }).catch((error) => { + logger.error('Error finding and extending session end job', { error }); + throw error; + }); + } else { + await createEventAndNotify( { - event: payload.name, - projectId, - } - ); - return null; - } + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }, + logger, + projectId + ).catch((error) => { + logger.error('Error creating session start event', { event: payload }); + throw error; + }); - if (!sessionEnd) { - const locked = await getLock( - `session_start:${projectId}:${sessionId}`, - '1', - 1000 - ); - if (locked) { - logger.info('Creating session start event', { event: payload }); - await createEventAndNotify( - { - ...payload, - name: 'session_start', - createdAt: new Date(getTime(payload.createdAt) - 100), - }, - logger, - projectId - ).catch((error) => { - logger.error('Error creating session start event', { event: payload }); - throw error; - }); - } else { - logger.info('Session start already claimed by another worker', { - event: payload, - }); - } - } - - const event = await createEventAndNotify(payload, logger, projectId); - - if (!event) { - // Skip creating session end when event was excluded - return null; - } - - if (!sessionEnd) { - logger.info('Creating session end job', { event: payload }); await createSessionEndJob({ payload }).catch((error) => { logger.error('Error creating session end job', { event: payload }); throw error; }); } - return event; + return createEventAndNotify(payload, logger, projectId); } diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index f1c24feb..f483fb74 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -186,6 +186,11 @@ describe('incomingEvent', () => { projectId, deviceId, sessionId: 'session-123', + session: { + referrer: '', + referrerName: '', + referrerType: '', + }, }; const changeDelay = vi.fn(); diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index eaea78d2..0eabf89d 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -1,5 +1,3 @@ -import client from 'prom-client'; - import { botBuffer, eventBuffer, @@ -8,6 +6,7 @@ import { sessionBuffer, } from '@openpanel/db'; import { cronQueue, eventsGroupQueues, sessionsQueue } from '@openpanel/queue'; +import client from 'prom-client'; const Registry = client.Registry; @@ -20,7 +19,7 @@ export const eventsGroupJobDuration = new client.Histogram({ name: 'job_duration_ms', help: 'Duration of job processing (in ms)', labelNames: ['name', 'status'], - buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s + buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10_000, 30_000], // 10ms to 30s }); register.registerMetric(eventsGroupJobDuration); @@ -28,57 +27,61 @@ register.registerMetric(eventsGroupJobDuration); queues.forEach((queue) => { register.registerMetric( new client.Gauge({ - name: `${queue.name.replace(/[\{\}]/g, '')}_active_count`, + name: `${queue.name.replace(/[{}]/g, '')}_active_count`, help: 'Active count', async collect() { const metric = await queue.getActiveCount(); this.set(metric); }, - }), + }) ); register.registerMetric( new client.Gauge({ - name: `${queue.name.replace(/[\{\}]/g, '')}_delayed_count`, + name: `${queue.name.replace(/[{}]/g, '')}_delayed_count`, help: 'Delayed count', async collect() { - const metric = await queue.getDelayedCount(); - this.set(metric); + if ('getDelayedCount' in queue) { + const metric = await queue.getDelayedCount(); + this.set(metric); + } else { + this.set(0); + } }, - }), + }) ); register.registerMetric( new client.Gauge({ - name: `${queue.name.replace(/[\{\}]/g, '')}_failed_count`, + name: `${queue.name.replace(/[{}]/g, '')}_failed_count`, help: 'Failed count', async collect() { const metric = await queue.getFailedCount(); this.set(metric); }, - }), + }) ); register.registerMetric( new client.Gauge({ - name: `${queue.name.replace(/[\{\}]/g, '')}_completed_count`, + name: `${queue.name.replace(/[{}]/g, '')}_completed_count`, help: 'Completed count', async collect() { const metric = await queue.getCompletedCount(); this.set(metric); }, - }), + }) ); register.registerMetric( new client.Gauge({ - name: `${queue.name.replace(/[\{\}]/g, '')}_waiting_count`, + name: `${queue.name.replace(/[{}]/g, '')}_waiting_count`, help: 'Waiting count', async collect() { const metric = await queue.getWaitingCount(); this.set(metric); }, - }), + }) ); }); @@ -90,7 +93,7 @@ register.registerMetric( const metric = await eventBuffer.getBufferSize(); this.set(metric); }, - }), + }) ); register.registerMetric( @@ -101,7 +104,7 @@ register.registerMetric( const metric = await profileBuffer.getBufferSize(); this.set(metric); }, - }), + }) ); register.registerMetric( @@ -112,7 +115,7 @@ register.registerMetric( const metric = await botBuffer.getBufferSize(); this.set(metric); }, - }), + }) ); register.registerMetric( @@ -123,7 +126,7 @@ register.registerMetric( const metric = await sessionBuffer.getBufferSize(); this.set(metric); }, - }), + }) ); register.registerMetric( @@ -134,5 +137,5 @@ register.registerMetric( const metric = await replayBuffer.getBufferSize(); this.set(metric); }, - }), + }) ); diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts index 01008c1f..0ab9511e 100644 --- a/apps/worker/src/utils/session-handler.ts +++ b/apps/worker/src/utils/session-handler.ts @@ -1,13 +1,39 @@ import type { IServiceCreateEventPayload } from '@openpanel/db'; -import { - type EventsQueuePayloadCreateSessionEnd, - sessionsQueue, -} from '@openpanel/queue'; -import type { Job } from 'bullmq'; -import { logger } from './logger'; +import { sessionsQueue } from '@openpanel/queue'; export const SESSION_TIMEOUT = 1000 * 60 * 30; +const CHANGE_DELAY_THROTTLE_MS = process.env.CHANGE_DELAY_THROTTLE_MS + ? Number.parseInt(process.env.CHANGE_DELAY_THROTTLE_MS, 10) + : 60_000; // 1 minute + +const CHANGE_DELAY_THROTTLE_MAP = new Map(); + +export async function extendSessionEndJob({ + projectId, + deviceId, +}: { + projectId: string; + deviceId: string; +}) { + const last = CHANGE_DELAY_THROTTLE_MAP.get(`${projectId}:${deviceId}`) ?? 0; + const isThrottled = Date.now() - last < CHANGE_DELAY_THROTTLE_MS; + + if (isThrottled) { + return; + } + + const jobId = getSessionEndJobId(projectId, deviceId); + const job = await sessionsQueue.getJob(jobId); + + if (!job) { + return; + } + + await job.changeDelay(SESSION_TIMEOUT); + CHANGE_DELAY_THROTTLE_MAP.set(`${projectId}:${deviceId}`, Date.now()); +} + const getSessionEndJobId = (projectId: string, deviceId: string) => `sessionEnd:${projectId}:${deviceId}`; @@ -33,106 +59,3 @@ export function createSessionEndJob({ } ); } - -export async function getSessionEnd({ - projectId, - deviceId, - profileId, -}: { - projectId: string; - deviceId: string; - profileId: string; -}) { - const sessionEnd = await getSessionEndJob({ - projectId, - deviceId, - }); - - if (sessionEnd) { - const existingSessionIsAnonymous = - sessionEnd.job.data.payload.profileId === - sessionEnd.job.data.payload.deviceId; - - const eventIsIdentified = - profileId && sessionEnd.job.data.payload.profileId !== profileId; - - if (existingSessionIsAnonymous && eventIsIdentified) { - await sessionEnd.job.updateData({ - ...sessionEnd.job.data, - payload: { - ...sessionEnd.job.data.payload, - profileId, - }, - }); - } - - await sessionEnd.job.changeDelay(SESSION_TIMEOUT); - return sessionEnd.job.data.payload; - } - - return null; -} - -export async function getSessionEndJob(args: { - projectId: string; - deviceId: string; - retryCount?: number; -}): Promise<{ - deviceId: string; - job: Job; -} | null> { - const { retryCount = 0 } = args; - - if (retryCount >= 6) { - throw new Error('Failed to get session end'); - } - - async function handleJobStates( - job: Job, - deviceId: string - ): Promise<{ - deviceId: string; - job: Job; - } | null> { - const state = await job.getState(); - if (state !== 'delayed') { - logger.debug(`[session-handler] Session end job is in "${state}" state`, { - state, - retryCount, - jobTimestamp: new Date(job.timestamp).toISOString(), - jobDelta: Date.now() - job.timestamp, - jobId: job.id, - payload: job.data.payload, - }); - } - - if (state === 'delayed' || state === 'waiting') { - return { deviceId, job }; - } - - if (state === 'active') { - await new Promise((resolve) => setTimeout(resolve, 100)); - return getSessionEndJob({ - ...args, - retryCount: retryCount + 1, - }); - } - - if (state === 'completed') { - await job.remove(); - } - - return null; - } - - // Check current device job - const currentJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.deviceId) - ); - if (currentJob) { - return await handleJobStates(currentJob, args.deviceId); - } - - // Create session - return null; -} diff --git a/biome.json b/biome.json index 347e1b48..9e7b023e 100644 --- a/biome.json +++ b/biome.json @@ -42,6 +42,7 @@ "useSemanticElements": "off" }, "style": { + "noNestedTernary": "off", "noNonNullAssertion": "off", "noParameterAssign": "error", "useAsConstAssertion": "error", diff --git a/packages/db/src/services/clients.service.ts b/packages/db/src/services/clients.service.ts index ea5eab17..6566d219 100644 --- a/packages/db/src/services/clients.service.ts +++ b/packages/db/src/services/clients.service.ts @@ -1,4 +1,4 @@ -import { cacheable, cacheableLru } from '@openpanel/redis'; +import { cacheable } from '@openpanel/redis'; import type { Client, Prisma } from '../prisma-client'; import { db } from '../prisma-client'; @@ -34,7 +34,4 @@ export async function getClientById( }); } -export const getClientByIdCached = cacheableLru(getClientById, { - maxSize: 1000, - ttl: 60 * 5, -}); +export const getClientByIdCached = cacheable(getClientById, 60 * 5); diff --git a/packages/db/src/services/notification.service.ts b/packages/db/src/services/notification.service.ts index 57d988f7..b1cbc1a5 100644 --- a/packages/db/src/services/notification.service.ts +++ b/packages/db/src/services/notification.service.ts @@ -90,7 +90,7 @@ export const getNotificationRulesByProjectId = cacheable( }, }); }, - 60 * 24 + 60 * 24, ); function getIntegration(integrationId: string | null) { diff --git a/packages/db/src/services/project.service.ts b/packages/db/src/services/project.service.ts index 7e8a997a..3144adcf 100644 --- a/packages/db/src/services/project.service.ts +++ b/packages/db/src/services/project.service.ts @@ -1,6 +1,6 @@ import { cacheable } from '@openpanel/redis'; import sqlstring from 'sqlstring'; -import { TABLE_NAMES, chQuery } from '../clickhouse/client'; +import { chQuery, TABLE_NAMES } from '../clickhouse/client'; import type { Prisma, Project } from '../prisma-client'; import { db } from '../prisma-client'; @@ -25,6 +25,7 @@ export async function getProjectById(id: string) { return res; } +/** L1 LRU (60s) + L2 Redis. clear() invalidates Redis + local LRU; other nodes may serve stale from LRU for up to 60s. */ export const getProjectByIdCached = cacheable(getProjectById, 60 * 60 * 24); export async function getProjectWithClients(id: string) { @@ -44,7 +45,7 @@ export async function getProjectWithClients(id: string) { return res; } -export async function getProjectsByOrganizationId(organizationId: string) { +export function getProjectsByOrganizationId(organizationId: string) { return db.project.findMany({ where: { organizationId, @@ -95,7 +96,7 @@ export async function getProjects({ if (access.length > 0) { return projects.filter((project) => - access.some((a) => a.projectId === project.id), + access.some((a) => a.projectId === project.id) ); } @@ -104,7 +105,7 @@ export async function getProjects({ export const getProjectEventsCount = async (projectId: string) => { const res = await chQuery<{ count: number }>( - `SELECT count(*) as count FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(projectId)} AND name NOT IN ('session_start', 'session_end')`, + `SELECT count(*) as count FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(projectId)} AND name NOT IN ('session_start', 'session_end')` ); return res[0]?.count; }; diff --git a/packages/db/src/services/salt.service.ts b/packages/db/src/services/salt.service.ts index ce3b9f59..424b3e6a 100644 --- a/packages/db/src/services/salt.service.ts +++ b/packages/db/src/services/salt.service.ts @@ -1,9 +1,9 @@ import { generateSalt } from '@openpanel/common/server'; -import { cacheableLru } from '@openpanel/redis'; +import { cacheable } from '@openpanel/redis'; import { db } from '../prisma-client'; -export const getSalts = cacheableLru( +export const getSalts = cacheable( 'op:salt', async () => { const [curr, prev] = await db.salt.findMany({ @@ -24,10 +24,7 @@ export const getSalts = cacheableLru( return salts; }, - { - maxSize: 2, - ttl: 60 * 5, - }, + 60 * 5, ); export async function createInitialSalts() { diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index aa769b48..48a5893e 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -6,7 +6,7 @@ import type { } from '@openpanel/db'; import { createLogger } from '@openpanel/logger'; import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; -import { Queue, QueueEvents } from 'bullmq'; +import { Queue } from 'bullmq'; import { Queue as GroupQueue } from 'groupmq'; import type { ITrackPayload } from '../../validation'; @@ -66,6 +66,10 @@ export interface EventsQueuePayloadIncomingEvent { headers: Record; deviceId: string; sessionId: string; + session?: Pick< + IServiceCreateEventPayload, + 'referrer' | 'referrerName' | 'referrerType' + >; }; } export interface EventsQueuePayloadCreateEvent { @@ -206,9 +210,6 @@ export const sessionsQueue = new Queue( }, } ); -export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), { - connection: getRedisQueue(), -}); export const cronQueue = new Queue(getQueueName('cron'), { connection: getRedisQueue(), diff --git a/packages/redis/cachable.ts b/packages/redis/cachable.ts index 3878af65..d77ec3a0 100644 --- a/packages/redis/cachable.ts +++ b/packages/redis/cachable.ts @@ -1,7 +1,7 @@ import { LRUCache } from 'lru-cache'; import { getRedisCache } from './redis'; -export const deleteCache = async (key: string) => { +export const deleteCache = (key: string) => { return getRedisCache().del(key); }; @@ -15,7 +15,7 @@ export async function getCache( key: string, expireInSec: number, fn: () => Promise, - useLruCache?: boolean, + useLruCache?: boolean ): Promise { // L1 Cache: Check global LRU cache first (in-memory, instant) if (useLruCache) { @@ -28,15 +28,7 @@ export async function getCache( // L2 Cache: Check Redis cache (shared across instances) const hit = await getRedisCache().get(key); if (hit) { - const parsed = JSON.parse(hit, (_, value) => { - if ( - typeof value === 'string' && - /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/.test(value) - ) { - return new Date(value); - } - return value; - }); + const parsed = parseCache(hit); // Store in LRU cache for next time if (useLruCache) { @@ -81,12 +73,24 @@ export function getGlobalLruCacheStats() { } function stringify(obj: unknown): string { - if (obj === null) return 'null'; - if (obj === undefined) return 'undefined'; - if (typeof obj === 'boolean') return obj ? 'true' : 'false'; - if (typeof obj === 'number') return String(obj); - if (typeof obj === 'string') return obj; - if (typeof obj === 'function') return obj.toString(); + if (obj === null) { + return 'null'; + } + if (obj === undefined) { + return 'undefined'; + } + if (typeof obj === 'boolean') { + return obj ? 'true' : 'false'; + } + if (typeof obj === 'number') { + return String(obj); + } + if (typeof obj === 'string') { + return obj; + } + if (typeof obj === 'function') { + return obj.toString(); + } if (Array.isArray(obj)) { return `[${obj.map(stringify).join(',')}]`; @@ -128,17 +132,29 @@ function hasResult(result: unknown): boolean { return true; } -export interface CacheableLruOptions { - /** TTL in seconds for LRU cache */ - ttl: number; - /** Maximum number of entries in LRU cache */ - maxSize?: number; -} +const DATE_REGEX = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/; +const parseCache = (cached: string) => { + try { + return JSON.parse(cached, (_, value) => { + if (typeof value === 'string' && DATE_REGEX.test(value)) { + return new Date(value); + } + return value; + }); + } catch (error) { + console.error('Failed to parse cache', error); + return null; + } +}; + +// L1 cache: short TTL to offload Redis; clear() invalidates Redis, other nodes may serve stale from LRU for up to this long +const CACHEABLE_LRU_TTL_MS = 60 * 1000; // 60 seconds +const CACHEABLE_LRU_MAX = 1000; // Overload 1: cacheable(fn, expireInSec) export function cacheable any>( fn: T, - expireInSec: number, + expireInSec: number ): T & { getKey: (...args: Parameters) => string; clear: (...args: Parameters) => Promise; @@ -151,7 +167,7 @@ export function cacheable any>( export function cacheable any>( name: string, fn: T, - expireInSec: number, + expireInSec: number ): T & { getKey: (...args: Parameters) => string; clear: (...args: Parameters) => Promise; @@ -164,7 +180,7 @@ export function cacheable any>( export function cacheable any>( fnOrName: T | string, fnOrExpireInSec: number | T, - _expireInSec?: number, + _expireInSec?: number ) { const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name; const fn = @@ -195,184 +211,67 @@ export function cacheable any>( const cachePrefix = `cachable:${name}`; const getKey = (...args: Parameters) => - `${cachePrefix}:${stringify(args)}`; + `${cachePrefix}:${stringify(args)}`.replaceAll(/\s/g, ''); - // Redis-only mode: asynchronous implementation + const lruCache = new LRUCache({ + max: CACHEABLE_LRU_MAX, + ttl: CACHEABLE_LRU_TTL_MS, + }); + + // L1 LRU (60s) + L2 Redis. clear() deletes Redis + local LRU; other nodes may serve stale from LRU for up to 60s. const cachedFn = async ( ...args: Parameters ): Promise>> => { const key = getKey(...args); - // Check Redis cache (shared across instances) + // L1: in-memory LRU first (offloads Redis on hot keys) + const lruHit = lruCache.get(key); + if (lruHit !== undefined && hasResult(lruHit)) { + return lruHit as Awaited>; + } + + // L2: Redis (shared across instances) const cached = await getRedisCache().get(key); if (cached) { - try { - const parsed = JSON.parse(cached, (_, value) => { - if ( - typeof value === 'string' && - /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/.test(value) - ) { - return new Date(value); - } - return value; - }); - if (hasResult(parsed)) { - return parsed; - } - } catch (e) { - console.error('Failed to parse cache', e); + const parsed = parseCache(cached); + if (hasResult(parsed)) { + lruCache.set(key, parsed); + return parsed; } } - // Cache miss: Execute function + // Cache miss: execute function const result = await fn(...(args as any)); if (hasResult(result)) { - // Don't await Redis write - fire and forget for better performance + lruCache.set(key, result); getRedisCache() .setex(key, expireInSec, JSON.stringify(result)) - .catch(() => {}); + .catch(() => { + // ignore error + }); } return result; }; - cachedFn.getKey = getKey; - cachedFn.clear = async (...args: Parameters) => { - const key = getKey(...args); - return getRedisCache().del(key); - }; - cachedFn.set = - (...args: Parameters) => - async (payload: Awaited>) => { - const key = getKey(...args); - return getRedisCache() - .setex(key, expireInSec, JSON.stringify(payload)) - .catch(() => {}); - }; - - return cachedFn; -} - -// Overload 1: cacheableLru(fn, options) -export function cacheableLru any>( - fn: T, - options: CacheableLruOptions, -): T & { - getKey: (...args: Parameters) => string; - clear: (...args: Parameters) => boolean; - set: (...args: Parameters) => (payload: ReturnType) => void; -}; - -// Overload 2: cacheableLru(name, fn, options) -export function cacheableLru any>( - name: string, - fn: T, - options: CacheableLruOptions, -): T & { - getKey: (...args: Parameters) => string; - clear: (...args: Parameters) => boolean; - set: (...args: Parameters) => (payload: ReturnType) => void; -}; - -// Implementation for cacheableLru (LRU-only - synchronous) -export function cacheableLru any>( - fnOrName: T | string, - fnOrOptions: T | CacheableLruOptions, - _options?: CacheableLruOptions, -) { - const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name; - const fn = - typeof fnOrName === 'function' - ? fnOrName - : typeof fnOrOptions === 'function' - ? fnOrOptions - : null; - - let options: CacheableLruOptions; - - // Parse parameters based on function signature - if (typeof fnOrName === 'function') { - // Overload 1: cacheableLru(fn, options) - options = - typeof fnOrOptions === 'object' && fnOrOptions !== null - ? fnOrOptions - : ({} as CacheableLruOptions); - } else { - // Overload 2: cacheableLru(name, fn, options) - options = - typeof _options === 'object' && _options !== null - ? _options - : ({} as CacheableLruOptions); - } - - if (typeof fn !== 'function') { - throw new Error('fn is not a function'); - } - - if (typeof options.ttl !== 'number') { - throw new Error('options.ttl is required and must be a number'); - } - - const cachePrefix = `cachable:${name}`; - const getKey = (...args: Parameters) => - `${cachePrefix}:${stringify(args)}`; - - const maxSize = options.maxSize ?? 1000; - const ttl = options.ttl; - - // Create function-specific LRU cache - const functionLruCache = new LRUCache({ - max: maxSize, - ttl: ttl * 1000, // Convert seconds to milliseconds for LRU - }); - - // LRU-only mode: synchronous implementation (or returns promise if fn is async) - const cachedFn = ((...args: Parameters): ReturnType => { - const key = getKey(...args); - - // Check LRU cache - const lruHit = functionLruCache.get(key); - if (lruHit !== undefined && hasResult(lruHit)) { - return lruHit as ReturnType; - } - - // Cache miss: Execute function - const result = fn(...(args as any)) as ReturnType; - - // If result is a Promise, handle it asynchronously but cache the resolved value - if (result && typeof (result as any).then === 'function') { - return (result as Promise).then((resolved: any) => { - if (hasResult(resolved)) { - functionLruCache.set(key, resolved); - } - return resolved; - }) as ReturnType; - } - - // Synchronous result: cache and return - if (hasResult(result)) { - functionLruCache.set(key, result); - } - - return result; - }) as T & { - getKey: (...args: Parameters) => string; - clear: (...args: Parameters) => boolean; - set: (...args: Parameters) => (payload: ReturnType) => void; - }; - cachedFn.getKey = getKey; cachedFn.clear = (...args: Parameters) => { const key = getKey(...args); - return functionLruCache.delete(key); + lruCache.delete(key); + return getRedisCache().del(key); }; cachedFn.set = (...args: Parameters) => - (payload: ReturnType) => { + (payload: Awaited>) => { const key = getKey(...args); if (hasResult(payload)) { - functionLruCache.set(key, payload); + lruCache.set(key, payload); + return getRedisCache() + .setex(key, expireInSec, JSON.stringify(payload)) + .catch(() => { + // ignore error + }); } }; diff --git a/packages/sdks/react-native/index.ts b/packages/sdks/react-native/index.ts index 7aa2baf3..30a61619 100644 --- a/packages/sdks/react-native/index.ts +++ b/packages/sdks/react-native/index.ts @@ -1,13 +1,13 @@ +import type { OpenPanelOptions, TrackProperties } from '@openpanel/sdk'; +import { OpenPanel as OpenPanelBase } from '@openpanel/sdk'; import * as Application from 'expo-application'; import Constants from 'expo-constants'; import { AppState, Platform } from 'react-native'; -import type { OpenPanelOptions, TrackProperties } from '@openpanel/sdk'; -import { OpenPanel as OpenPanelBase } from '@openpanel/sdk'; - export * from '@openpanel/sdk'; export class OpenPanel extends OpenPanelBase { + private lastPath = ''; constructor(public options: OpenPanelOptions) { super({ ...options, @@ -37,7 +37,12 @@ export class OpenPanel extends OpenPanelBase { }); } - public screenView(route: string, properties?: TrackProperties): void { + track(name: string, properties?: TrackProperties) { + return super.track(name, { ...properties, __path: this.lastPath }); + } + + screenView(route: string, properties?: TrackProperties): void { + this.lastPath = route; super.track('screen_view', { ...properties, __path: route, diff --git a/packages/sdks/web/src/index.ts b/packages/sdks/web/src/index.ts index 1446813b..c15da114 100644 --- a/packages/sdks/web/src/index.ts +++ b/packages/sdks/web/src/index.ts @@ -58,7 +58,7 @@ export type OpenPanelOptions = OpenPanelBaseOptions & { function toCamelCase(str: string) { return str.replace(/([-_][a-z])/gi, ($1) => - $1.toUpperCase().replace('-', '').replace('_', ''), + $1.toUpperCase().replace('-', '').replace('_', '') ); } @@ -114,7 +114,9 @@ export class OpenPanel extends OpenPanelBase { const sampled = Math.random() < sampleRate; if (sampled) { this.loadReplayModule().then((mod) => { - if (!mod) return; + if (!mod) { + return; + } mod.startReplayRecorder(this.options.sessionReplay!, (chunk) => { // Replay chunks go through send() and are queued when disabled or waitForProfile // until ready() is called (base SDK also queues replay until sessionId is set). @@ -153,7 +155,10 @@ export class OpenPanel extends OpenPanelBase { // dead-code-eliminated in the library build. if (typeof __OPENPANEL_REPLAY_URL__ !== 'undefined') { const scriptEl = _replayScriptRef; - const url = this.options.sessionReplay?.scriptUrl || scriptEl?.src?.replace('.js', '-replay.js') || 'https://openpanel.dev/op1-replay.js'; + const url = + this.options.sessionReplay?.scriptUrl || + scriptEl?.src?.replace('.js', '-replay.js') || + 'https://openpanel.dev/op1-replay.js'; // Already loaded (e.g. user included the script manually) if ((window as any).__openpanel_replay) { @@ -287,11 +292,15 @@ export class OpenPanel extends OpenPanelBase { }); } + track(name: string, properties?: TrackProperties) { + return super.track(name, { ...properties, __path: this.lastPath }); + } + screenView(properties?: TrackProperties): void; screenView(path: string, properties?: TrackProperties): void; screenView( pathOrProperties?: string | TrackProperties, - propertiesOrUndefined?: TrackProperties, + propertiesOrUndefined?: TrackProperties ): void { if (this.isServer()) { return; @@ -322,7 +331,7 @@ export class OpenPanel extends OpenPanelBase { async flushRevenue() { const promises = this.pendingRevenues.map((pending) => - super.revenue(pending.amount, pending.properties), + super.revenue(pending.amount, pending.properties) ); await Promise.all(promises); this.clearRevenue(); @@ -343,7 +352,7 @@ export class OpenPanel extends OpenPanelBase { try { sessionStorage.setItem( 'openpanel-pending-revenues', - JSON.stringify(this.pendingRevenues), + JSON.stringify(this.pendingRevenues) ); } catch {} } diff --git a/packages/trpc/src/routers/project.ts b/packages/trpc/src/routers/project.ts index 80e650aa..5c515ba5 100644 --- a/packages/trpc/src/routers/project.ts +++ b/packages/trpc/src/routers/project.ts @@ -96,9 +96,7 @@ export const projectRouter = createTRPCRouter({ }); await Promise.all([ getProjectByIdCached.clear(input.id), - res.clients.map((client) => { - getClientByIdCached.clear(client.id); - }), + ...res.clients.map((client) => getClientByIdCached.clear(client.id)), ]); return res; }), diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8fc07911..68ec0464 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,8 +16,8 @@ catalogs: specifier: ^19.2.3 version: 19.2.3 groupmq: - specifier: 1.1.1-next.2 - version: 1.1.1-next.2 + specifier: 2.0.0-next.1 + version: 2.0.0-next.1 react: specifier: ^19.2.3 version: 19.2.3 @@ -198,7 +198,7 @@ importers: version: 5.0.0 groupmq: specifier: 'catalog:' - version: 1.1.1-next.2(ioredis@5.8.2) + version: 2.0.0-next.1(ioredis@5.8.2) jsonwebtoken: specifier: ^9.0.2 version: 9.0.2 @@ -936,7 +936,7 @@ importers: version: 4.18.2 groupmq: specifier: 'catalog:' - version: 1.1.1-next.2(ioredis@5.8.2) + version: 2.0.0-next.1(ioredis@5.8.2) prom-client: specifier: ^15.1.3 version: 15.1.3 @@ -1419,7 +1419,7 @@ importers: version: 5.63.0 groupmq: specifier: 'catalog:' - version: 1.1.1-next.2(ioredis@5.8.2) + version: 2.0.0-next.1(ioredis@5.8.2) devDependencies: '@openpanel/tsconfig': specifier: workspace:* @@ -13157,11 +13157,11 @@ packages: glob@7.1.6: resolution: {integrity: sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA==} - deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me + deprecated: Glob versions prior to v9 are no longer supported glob@7.2.3: resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==} - deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me + deprecated: Glob versions prior to v9 are no longer supported glob@9.3.5: resolution: {integrity: sha512-e1LleDykUz2Iu+MTYdkSsuWX8lvAjAcs0Xef0lNIu0S2wOAzuTxCJtcd9S3cijlwYF18EsU3rzb8jPVobxDh9Q==} @@ -13221,8 +13221,8 @@ packages: resolution: {integrity: sha512-5gghUc24tP9HRznNpV2+FIoq3xKkj5dTQqf4v0CpdPbFVwFkWoxOM+o+2OC9ZSvjEMTjfmG9QT+gcvggTwW1zw==} engines: {node: '>= 10.x'} - groupmq@1.1.1-next.2: - resolution: {integrity: sha512-5gH+P3NfSCjfCLcB2g2TAHCpmQz+rwrQkb+kAyrzB9puZuAHKQVYOUPWKVBRFjY7B9jPRGHrimDO6h9rWKGfMA==} + groupmq@2.0.0-next.1: + resolution: {integrity: sha512-xcpz29HeXXn0yP/sQTGPPNMLQAZCCrJg3x9kpOAFbtsXki5KVeBsY3mWNBt3Z+YCa9OxwkTFL6tOcrB67z127A==} engines: {node: '>=18'} peerDependencies: ioredis: '>=5' @@ -34142,7 +34142,7 @@ snapshots: graphql@15.8.0: {} - groupmq@1.1.1-next.2(ioredis@5.8.2): + groupmq@2.0.0-next.1(ioredis@5.8.2): dependencies: cron-parser: 4.9.0 ioredis: 5.8.2 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 3a8656c9..8fee52f4 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -13,4 +13,4 @@ catalog: "@types/react-dom": ^19.2.3 "@types/node": ^24.7.1 typescript: ^5.9.3 - groupmq: 1.1.1-next.2 + groupmq: 2.0.0-next.1