diff --git a/apps/api/src/hooks/duplicate.hook.ts b/apps/api/src/hooks/duplicate.hook.ts index f5976f10..3fc0689a 100644 --- a/apps/api/src/hooks/duplicate.hook.ts +++ b/apps/api/src/hooks/duplicate.hook.ts @@ -8,12 +8,19 @@ export async function duplicateHook( }>, reply: FastifyReply, ) { - const isDuplicate = await isDuplicatedEvent({ - ip: req.clientIp ?? '', - origin: req.headers.origin ?? '', - payload: req.body, - projectId: (req.headers['openpanel-client-id'] as string) || '', - }); + const ip = req.clientIp; + const origin = req.headers.origin; + const clientId = req.headers['openpanel-client-id']; + const shouldCheck = ip && origin && clientId; + + const isDuplicate = shouldCheck + ? await isDuplicatedEvent({ + ip, + origin, + payload: req.body, + projectId: clientId as string, + }) + : false; if (isDuplicate) { return reply.status(200).send('Duplicate event'); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index abc98844..6e6e630e 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -2,11 +2,13 @@ import { handler } from '@/controllers/track.controller'; import type { FastifyPluginCallback } from 'fastify'; import { clientHook } from '@/hooks/client.hook'; +import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; const trackRouter: FastifyPluginCallback = async (fastify) => { - fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); + fastify.addHook('preValidation', duplicateHook); + fastify.addHook('preHandler', clientHook); fastify.route({ method: 'POST', diff --git a/apps/api/src/utils/auth.ts b/apps/api/src/utils/auth.ts index afa24d43..14a97a6e 100644 --- a/apps/api/src/utils/auth.ts +++ b/apps/api/src/utils/auth.ts @@ -137,7 +137,7 @@ export async function validateSdkRequest( if (client.secret && clientSecret) { const isVerified = await getCache( - `client:auth:${clientId}:${clientSecret.slice(0, 5)}`, + `client:auth:${clientId}:${Buffer.from(clientSecret).toString('base64')}`, 60 * 5, async () => await verifyPassword(clientSecret, client.secret!), true, diff --git a/apps/worker/scripts/cleanup-old-event-buffer-keys.ts b/apps/worker/scripts/cleanup-old-event-buffer-keys.ts index 8d4fff79..771079a7 100644 --- a/apps/worker/scripts/cleanup-old-event-buffer-keys.ts +++ b/apps/worker/scripts/cleanup-old-event-buffer-keys.ts @@ -104,8 +104,12 @@ async function cleanupOldEventBufferKeys(): Promise { const events = await redis.lrange(sessionKey, 0, -1); if (events.length > 0) { - // Move events to new queue - await redis.rpush(newQueueKey, ...events); + // Move events to new queue in safe batches to avoid exceeding V8 arg limits + const chunkSize = 1000; + for (let offset = 0; offset < events.length; offset = chunkSize) { + const chunk = events.slice(offset, offset + chunkSize); + await redis.rpush(newQueueKey, ...chunk); + } // Update buffer counter await redis.incrby('event_buffer:total_count', events.length); totalEventsMigrated += events.length; diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 59e22e46..16b7536d 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -58,7 +58,7 @@ export async function bootCron() { // TODO: Switch to getJobSchedulers const repeatableJobs = await cronQueue.getRepeatableJobs(); for (const repeatableJob of repeatableJobs) { - cronQueue.removeRepeatableByKey(repeatableJob.key); + await cronQueue.removeRepeatableByKey(repeatableJob.key); } // Add repeatable jobs diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 05698bfd..4831c19d 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -19,7 +19,7 @@ import { setTimeout as sleep } from 'node:timers/promises'; import { Worker as GroupWorker } from 'groupmq'; import { cronJob } from './jobs/cron'; -import { incomingEventPure } from './jobs/events.incoming-event'; +import { incomingEvent } from './jobs/events.incoming-event'; import { importJob } from './jobs/import'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; @@ -122,7 +122,7 @@ export async function bootWorkers() { process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1', ), handler: async (job) => { - return await incomingEventPure(job.data); + return await incomingEvent(job.data); }, }); @@ -184,7 +184,7 @@ export async function bootWorkers() { concurrency, }); workers.push(importWorker); - logger.info('Started worker for misc', { concurrency }); + logger.info('Started worker for import', { concurrency }); } if (workers.length === 0) { diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index ed55587e..c1d15964 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -45,18 +45,8 @@ async function createEventAndNotify( } export async function incomingEvent( - job: Job, - token?: string, -) { - return incomingEventPure(job.data.payload, job, token); -} - -export async function incomingEventPure( jobPayload: EventsQueuePayloadIncomingEvent['payload'], - job?: Job, - token?: string, ) { - await getRedisCache().incr('queue:counter'); const { geo, event: body, diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index d5d9dd0c..c4deec1b 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -1,6 +1,9 @@ import { type IServiceEvent, createEvent } from '@openpanel/db'; import { eventBuffer } from '@openpanel/db'; -import { sessionsQueue } from '@openpanel/queue'; +import { + type EventsQueuePayloadIncomingEvent, + sessionsQueue, +} from '@openpanel/queue'; import type { Job } from 'bullmq'; import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest'; import { incomingEvent } from './events.incoming-event'; @@ -32,6 +35,28 @@ const geo = { latitude: 0, }; +const uaInfo: EventsQueuePayloadIncomingEvent['payload']['uaInfo'] = { + isServer: false, + device: 'desktop', + os: 'Windows', + osVersion: '10', + browser: 'Chrome', + browserVersion: '91.0.4472.124', + brand: '', + model: '', +}; + +const uaInfoServer: EventsQueuePayloadIncomingEvent['payload']['uaInfo'] = { + isServer: true, + device: 'server', + os: '', + osVersion: '', + browser: '', + browserVersion: '', + brand: '', + model: '', +}; + describe('incomingEvent', () => { beforeEach(() => { vi.clearAllMocks(); @@ -41,31 +66,29 @@ describe('incomingEvent', () => { const spySessionsQueueAdd = vi.spyOn(sessionsQueue, 'add'); const timestamp = new Date(); // Mock job data - const jobData = { - payload: { - geo, - event: { - name: 'test_event', - timestamp: timestamp.toISOString(), - properties: { __path: 'https://example.com/test' }, - }, - headers: { - 'request-id': '123', - 'user-agent': - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'openpanel-sdk-name': 'web', - 'openpanel-sdk-version': '1.0.0', - }, - projectId, - currentDeviceId, - previousDeviceId, + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'test_event', + timestamp: timestamp.toISOString(), + isTimestampFromThePast: false, + properties: { __path: 'https://example.com/test' }, }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + currentDeviceId, + previousDeviceId, }; - const job = { data: jobData } as Job; - // Execute the job - await incomingEvent(job); + await incomingEvent(jobData); const event = { name: 'test_event', @@ -78,8 +101,8 @@ describe('incomingEvent', () => { properties: { __hash: undefined, __query: undefined, - __user_agent: jobData.payload.headers['user-agent'], - __reqId: jobData.payload.headers['request-id'], + __user_agent: jobData.headers['user-agent'], + __reqId: jobData.headers['request-id'], }, createdAt: timestamp, country: 'US', @@ -92,16 +115,16 @@ describe('incomingEvent', () => { browser: 'Chrome', browserVersion: '91.0.4472.124', device: 'desktop', - brand: undefined, - model: undefined, + brand: '', + model: '', duration: 0, path: '/test', origin: 'https://example.com', referrer: '', referrerName: '', referrerType: '', - sdkName: jobData.payload.headers['openpanel-sdk-name'], - sdkVersion: jobData.payload.headers['openpanel-sdk-version'], + sdkName: jobData.headers['openpanel-sdk-name'], + sdkVersion: jobData.headers['openpanel-sdk-version'], }; expect(spySessionsQueueAdd).toHaveBeenCalledWith( @@ -135,29 +158,27 @@ describe('incomingEvent', () => { const timestamp = new Date(); // Mock job data - const jobData = { - payload: { - geo, - event: { - name: 'test_event', - timestamp: timestamp.toISOString(), - properties: { __path: 'https://example.com/test' }, - }, - headers: { - 'request-id': '123', - 'user-agent': - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'openpanel-sdk-name': 'web', - 'openpanel-sdk-version': '1.0.0', - }, - projectId, - currentDeviceId, - previousDeviceId, + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'test_event', + timestamp: timestamp.toISOString(), + properties: { __path: 'https://example.com/test' }, + isTimestampFromThePast: false, }, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + uaInfo, + projectId, + currentDeviceId, + previousDeviceId, }; - const job = { data: jobData } as Job; - const changeDelay = vi.fn(); const updateData = vi.fn(); spySessionsQueueGetJob.mockResolvedValueOnce({ @@ -175,7 +196,7 @@ describe('incomingEvent', () => { }, } as Partial as Job); // Execute the job - await incomingEvent(job); + await incomingEvent(jobData); const event = { name: 'test_event', @@ -186,8 +207,8 @@ describe('incomingEvent', () => { properties: { __hash: undefined, __query: undefined, - __user_agent: jobData.payload.headers['user-agent'], - __reqId: jobData.payload.headers['request-id'], + __user_agent: jobData.headers['user-agent'], + __reqId: jobData.headers['request-id'], }, createdAt: timestamp, country: 'US', @@ -200,16 +221,16 @@ describe('incomingEvent', () => { browser: 'Chrome', browserVersion: '91.0.4472.124', device: 'desktop', - brand: undefined, - model: undefined, + brand: '', + model: '', duration: 0, path: '/test', origin: 'https://example.com', referrer: '', referrerName: '', referrerType: '', - sdkName: jobData.payload.headers['openpanel-sdk-name'], - sdkVersion: jobData.payload.headers['openpanel-sdk-version'], + sdkName: jobData.headers['openpanel-sdk-name'], + sdkVersion: jobData.headers['openpanel-sdk-version'], }; expect(spySessionsQueueAdd).toHaveBeenCalledTimes(0); @@ -220,29 +241,27 @@ describe('incomingEvent', () => { it('should handle server events (with existing screen view)', async () => { const timestamp = new Date(); - const jobData = { - payload: { - geo, - event: { - name: 'server_event', - timestamp: timestamp.toISOString(), - properties: { custom_property: 'test_value' }, - profileId: 'profile-123', - }, - headers: { - 'user-agent': 'OpenPanel Server/1.0', - 'openpanel-sdk-name': 'server', - 'openpanel-sdk-version': '1.0.0', - 'request-id': '123', - }, - projectId, - currentDeviceId: '', - previousDeviceId: '', + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'server_event', + timestamp: timestamp.toISOString(), + properties: { custom_property: 'test_value' }, + profileId: 'profile-123', + isTimestampFromThePast: false, }, + headers: { + 'user-agent': 'OpenPanel Server/1.0', + 'openpanel-sdk-name': 'server', + 'openpanel-sdk-version': '1.0.0', + 'request-id': '123', + }, + projectId, + currentDeviceId: '', + previousDeviceId: '', + uaInfo: uaInfoServer, }; - const job = { data: jobData } as Job; - const mockLastScreenView = { deviceId: 'last-device-123', sessionId: 'last-session-456', @@ -268,7 +287,7 @@ describe('incomingEvent', () => { mockLastScreenView as IServiceEvent, ); - await incomingEvent(job); + await incomingEvent(jobData); expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ name: 'server_event', @@ -311,33 +330,31 @@ describe('incomingEvent', () => { it('should handle server events (without existing screen view)', async () => { const timestamp = new Date(); - const jobData = { - payload: { - geo, - event: { - name: 'server_event', - timestamp: timestamp.toISOString(), - properties: { custom_property: 'test_value' }, - profileId: 'profile-123', - }, - headers: { - 'user-agent': 'OpenPanel Server/1.0', - 'openpanel-sdk-name': 'server', - 'openpanel-sdk-version': '1.0.0', - 'request-id': '123', - }, - projectId, - currentDeviceId: '', - previousDeviceId: '', + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'server_event', + timestamp: timestamp.toISOString(), + properties: { custom_property: 'test_value' }, + profileId: 'profile-123', + isTimestampFromThePast: false, }, + headers: { + 'user-agent': 'OpenPanel Server/1.0', + 'openpanel-sdk-name': 'server', + 'openpanel-sdk-version': '1.0.0', + 'request-id': '123', + }, + projectId, + currentDeviceId: '', + previousDeviceId: '', + uaInfo: uaInfoServer, }; - const job = { data: jobData } as Job; - // Mock getLastScreenView to return null vi.mocked(eventBuffer.getLastScreenView).mockResolvedValueOnce(null); - await incomingEvent(job); + await incomingEvent(jobData); expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ name: 'server_event', diff --git a/apps/worker/src/jobs/events.ts b/apps/worker/src/jobs/events.ts deleted file mode 100644 index 51298e29..00000000 --- a/apps/worker/src/jobs/events.ts +++ /dev/null @@ -1,15 +0,0 @@ -import type { Job } from 'bullmq'; - -import type { - EventsQueuePayload, - EventsQueuePayloadIncomingEvent, -} from '@openpanel/queue'; - -import { incomingEvent } from './events.incoming-event'; - -export async function eventsJob(job: Job, token?: string) { - return await incomingEvent( - job as Job, - token, - ); -} diff --git a/packages/common/server/parser-user-agent.ts b/packages/common/server/parser-user-agent.ts index 0a592ee4..7a3551c9 100644 --- a/packages/common/server/parser-user-agent.ts +++ b/packages/common/server/parser-user-agent.ts @@ -90,7 +90,7 @@ const parse = (ua: string): UAParser.IResult => { ...res, os: { ...res.os, - version: osVersion[1]!.replace('_', '.'), + version: osVersion[1]!.replace(/_/g, '.'), }, }; parseCache.set(ua, result); diff --git a/packages/geo/src/geo.ts b/packages/geo/src/geo.ts index a0ed73b3..30ca2199 100644 --- a/packages/geo/src/geo.ts +++ b/packages/geo/src/geo.ts @@ -2,13 +2,13 @@ import { readFile } from 'node:fs/promises'; import path from 'node:path'; import { dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = dirname(__filename); import type { ReaderModel } from '@maxmind/geoip2-node'; import { Reader } from '@maxmind/geoip2-node'; import { LRUCache } from 'lru-cache'; +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + const filename = 'GeoLite2-City.mmdb'; // From api or worker package const dbPath = path.join(__dirname, `../../../packages/geo/${filename}`); @@ -73,13 +73,15 @@ export async function getGeoLocation(ip?: string): Promise { try { const response = await reader?.city(ip); - return { + const res = { city: response?.city?.names.en, country: response?.country?.isoCode, region: response?.subdivisions?.[0]?.names.en, longitude: response?.location?.longitude, latitude: response?.location?.latitude, }; + cache.set(ip, res); + return res; } catch (error) { return DEFAULT_GEO; }