diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 0d2edc3c..73f6c000 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,5 +1,3 @@ -import { logger as baseLogger } from '@/utils/logger'; -import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import { getReferrerWithQuery, @@ -18,6 +16,8 @@ import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import * as R from 'ramda'; import { v4 as uuid } from 'uuid'; +import { logger as baseLogger } from '@/utils/logger'; +import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; @@ -30,23 +30,23 @@ const merge = (a: Partial, b: Partial): A & B => async function createEventAndNotify( payload: IServiceCreateEventPayload, logger: ILogger, - projectId: string, + projectId: string ) { // Check project-level event exclude filters const project = await getProjectByIdCached(projectId); const eventExcludeFilters = (project?.filters ?? []).filter( - (f) => f.type === 'event', + (f) => f.type === 'event' ); if (eventExcludeFilters.length > 0) { const isExcluded = eventExcludeFilters.some((filter) => - matchEvent(payload, filter), + matchEvent(payload, filter) ); if (isExcluded) { logger.info('Event excluded by project filter', { event: payload.name, projectId, }); - return null + return null; } } @@ -55,22 +55,30 @@ async function createEventAndNotify( createEvent(payload), checkNotificationRulesForEvent(payload).catch(() => {}), ]); + + console.log('Event created:', event); return event; } const parseRevenue = (revenue: unknown): number | undefined => { - if (!revenue) return undefined; - if (typeof revenue === 'number') return revenue; + if (!revenue) { + return undefined; + } + if (typeof revenue === 'number') { + return revenue; + } if (typeof revenue === 'string') { const parsed = Number.parseFloat(revenue); - if (Number.isNaN(parsed)) return undefined; + if (Number.isNaN(parsed)) { + return undefined; + } return parsed; } return undefined; }; export async function incomingEvent( - jobPayload: EventsQueuePayloadIncomingEvent['payload'], + jobPayload: EventsQueuePayloadIncomingEvent['payload'] ) { const { geo, @@ -149,6 +157,7 @@ export async function incomingEvent( : undefined, } as const; + console.log('HERE?'); // if timestamp is from the past we dont want to create a new session if (uaInfo.isServer || isTimestampFromThePast) { const session = profileId @@ -158,6 +167,8 @@ export async function incomingEvent( }) : null; + console.log('Server?'); + const payload = { ...baseEvent, deviceId: session?.device_id ?? '', @@ -183,14 +194,14 @@ export async function incomingEvent( return createEventAndNotify(payload as IServiceEvent, logger, projectId); } - + console.log('not?'); const sessionEnd = await getSessionEnd({ projectId, currentDeviceId, previousDeviceId, profileId, }); - + console.log('Server?'); const lastScreenView = sessionEnd ? await sessionBuffer.getExistingSession({ sessionId: sessionEnd.sessionId, @@ -207,7 +218,7 @@ export async function incomingEvent( path: baseEvent.path || lastScreenView?.exit_path || '', origin: baseEvent.origin || lastScreenView?.exit_origin || '', } as Partial) as IServiceCreateEventPayload; - + console.log('SessionEnd?', sessionEnd); if (!sessionEnd) { logger.info('Creating session start event', { event: payload }); await createEventAndNotify( @@ -228,7 +239,7 @@ export async function incomingEvent( if (!event) { // Skip creating session end when event was excluded - return null + return null; } if (!sessionEnd) { diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 1ee1ebb9..aef05511 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -1,18 +1,15 @@ import { - type IClickhouseSession, - type IServiceEvent, - type IServiceSession, createEvent, formatClickhouseDate, + type IClickhouseSession, sessionBuffer, } from '@openpanel/db'; -import { eventBuffer } from '@openpanel/db'; import { type EventsQueuePayloadIncomingEvent, sessionsQueue, } from '@openpanel/queue'; import type { Job } from 'bullmq'; -import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest'; +import { beforeEach, describe, expect, it, type Mock, vi } from 'vitest'; import { incomingEvent } from './events.incoming-event'; vi.mock('@openpanel/queue'); @@ -22,6 +19,8 @@ vi.mock('@openpanel/db', async () => { ...actual, createEvent: vi.fn(), checkNotificationRulesForEvent: vi.fn().mockResolvedValue(true), + getProjectByIdCached: vi.fn().mockResolvedValue({ filters: [] }), + matchEvent: vi.fn().mockReturnValue(false), sessionBuffer: { getExistingSession: vi.fn(), }, @@ -68,7 +67,7 @@ describe('incomingEvent', () => { vi.clearAllMocks(); }); - it('should create a session start and an event', async () => { + it.only('should create a session start and an event', async () => { const spySessionsQueueAdd = vi.spyOn(sessionsQueue, 'add'); const timestamp = new Date(); // Mock job data @@ -92,16 +91,12 @@ describe('incomingEvent', () => { currentDeviceId, previousDeviceId, }; - - // Execute the job - await incomingEvent(jobData); - const event = { name: 'test_event', deviceId: currentDeviceId, profileId: '', sessionId: expect.stringMatching( - /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i, + /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i ), projectId, properties: { @@ -132,6 +127,11 @@ describe('incomingEvent', () => { sdkVersion: jobData.headers['openpanel-sdk-version'], }; + (createEvent as Mock).mockReturnValue(event); + + // Execute the job + await incomingEvent(jobData); + expect(spySessionsQueueAdd).toHaveBeenCalledWith( 'session', { @@ -146,7 +146,7 @@ describe('incomingEvent', () => { delay: 200, type: 'exponential', }, - }, + } ); expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ @@ -266,26 +266,6 @@ describe('incomingEvent', () => { uaInfo: uaInfoServer, }; - const mockLastScreenView = { - deviceId: 'last-device-123', - sessionId: 'last-session-456', - country: 'CA', - city: 'Toronto', - region: 'ON', - os: 'iOS', - osVersion: '15.0', - browser: 'Safari', - browserVersion: '15.0', - device: 'mobile', - brand: 'Apple', - model: 'iPhone', - path: '/last-path', - origin: 'https://example.com', - referrer: 'https://google.com', - referrerName: 'Google', - referrerType: 'search', - }; - vi.mocked(sessionBuffer.getExistingSession).mockResolvedValueOnce({ id: 'last-session-456', event_count: 0,