diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index acea9861..7dc6dd78 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -61,8 +61,6 @@ export async function postEvent( }, uaInfo, geo, - currentDeviceId: '', - previousDeviceId: '', deviceId, sessionId: sessionId ?? '', }, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 4b0de8b8..f7a3c61f 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -217,8 +217,6 @@ async function handleTrack( geo, deviceId, sessionId, - currentDeviceId: '', // TODO: Remove - previousDeviceId: '', // TODO: Remove }, groupId, jobId, diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index a0af841f..21b169c0 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -14,7 +14,8 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; -import * as R from 'ramda'; +import { getLock } from '@openpanel/redis'; +import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; @@ -24,7 +25,22 @@ const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; // First it will strip '' and undefined/null from B // Then it will merge the two objects with a standard ramda merge function const merge = (a: Partial, b: Partial): A & B => - R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B; + mergeDeepRight(a, reject(anyPass([isEmpty, isNil]))(b)) as A & B; + +/** Check if payload matches project-level event exclude filters */ +async function isEventExcludedByProjectFilter( + payload: IServiceCreateEventPayload, + projectId: string +): Promise { + const project = await getProjectByIdCached(projectId); + const eventExcludeFilters = (project?.filters ?? []).filter( + (f) => f.type === 'event' + ); + if (eventExcludeFilters.length === 0) { + return false; + } + return eventExcludeFilters.some((filter) => matchEvent(payload, filter)); +} async function createEventAndNotify( payload: IServiceCreateEventPayload, @@ -32,21 +48,13 @@ async function createEventAndNotify( projectId: string ) { // Check project-level event exclude filters - const project = await getProjectByIdCached(projectId); - const eventExcludeFilters = (project?.filters ?? []).filter( - (f) => f.type === 'event' - ); - if (eventExcludeFilters.length > 0) { - const isExcluded = eventExcludeFilters.some((filter) => - matchEvent(payload, filter) - ); - if (isExcluded) { - logger.info('Event excluded by project filter', { - event: payload.name, - projectId, - }); - return null; - } + const isExcluded = await isEventExcludedByProjectFilter(payload, projectId); + if (isExcluded) { + logger.info('Event excluded by project filter', { + event: payload.name, + projectId, + }); + return null; } logger.info('Creating event', { event: payload }); @@ -83,8 +91,6 @@ export async function incomingEvent( event: body, headers, projectId, - currentDeviceId, - previousDeviceId, deviceId, sessionId, uaInfo: _uaInfo, @@ -125,7 +131,7 @@ export async function incomingEvent( name: body.name, profileId, projectId, - properties: R.omit(GLOBAL_PROPERTIES, { + properties: omit(GLOBAL_PROPERTIES, { ...properties, __hash: hash, __query: query, @@ -194,8 +200,6 @@ export async function incomingEvent( const sessionEnd = await getSessionEnd({ projectId, - currentDeviceId, - previousDeviceId, deviceId, profileId, }); @@ -216,20 +220,44 @@ export async function incomingEvent( origin: baseEvent.origin || activeSession?.exit_origin || '', } as Partial) as IServiceCreateEventPayload; - if (!sessionEnd) { - logger.info('Creating session start event', { event: payload }); - await createEventAndNotify( + // If the triggering event is filtered, do not create session_start or the event (issue #2) + const isExcluded = await isEventExcludedByProjectFilter(payload, projectId); + if (isExcluded) { + logger.info( + 'Skipping session_start and event (excluded by project filter)', { - ...payload, - name: 'session_start', - createdAt: new Date(getTime(payload.createdAt) - 100), - }, - logger, - projectId - ).catch((error) => { - logger.error('Error creating session start event', { event: payload }); - throw error; - }); + event: payload.name, + projectId, + } + ); + return null; + } + + if (!sessionEnd) { + const locked = await getLock( + `session_start:${projectId}:${sessionId}`, + '1', + 1000 + ); + if (locked) { + logger.info('Creating session start event', { event: payload }); + await createEventAndNotify( + { + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }, + logger, + projectId + ).catch((error) => { + logger.error('Error creating session start event', { event: payload }); + throw error; + }); + } else { + logger.info('Session start already claimed by another worker', { + event: payload, + }); + } } const event = await createEventAndNotify(payload, logger, projectId); diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index b5e5226b..121123eb 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -30,8 +30,7 @@ vi.mock('@openpanel/db', async () => { // 30 minutes const SESSION_TIMEOUT = 30 * 60 * 1000; const projectId = 'test-project'; -const currentDeviceId = 'device-123'; -const previousDeviceId = 'device-456'; +const deviceId = 'device-123'; // Valid UUID used when creating a new session in tests const newSessionId = 'a1b2c3d4-e5f6-4789-a012-345678901234'; const geo = { @@ -90,14 +89,12 @@ describe('incomingEvent', () => { 'openpanel-sdk-version': '1.0.0', }, projectId, - currentDeviceId, - previousDeviceId, - deviceId: currentDeviceId, + deviceId, sessionId: newSessionId, }; const event = { name: 'test_event', - deviceId: currentDeviceId, + deviceId, profileId: '', sessionId: expect.stringMatching( // biome-ignore lint/performance/useTopLevelRegex: test @@ -145,7 +142,7 @@ describe('incomingEvent', () => { }, { delay: SESSION_TIMEOUT, - jobId: `sessionEnd:${projectId}:${currentDeviceId}`, + jobId: `sessionEnd:${projectId}:${deviceId}`, attempts: 3, backoff: { delay: 200, @@ -185,9 +182,7 @@ describe('incomingEvent', () => { }, uaInfo, projectId, - currentDeviceId, - previousDeviceId, - deviceId: currentDeviceId, + deviceId, sessionId: 'session-123', }; @@ -201,9 +196,7 @@ describe('incomingEvent', () => { type: 'createSessionEnd', payload: { sessionId: 'session-123', - deviceId: currentDeviceId, - profileId: currentDeviceId, - projectId, + deviceId, }, }, } as Partial as Job); @@ -212,7 +205,7 @@ describe('incomingEvent', () => { const event = { name: 'test_event', - deviceId: currentDeviceId, + deviceId, profileId: '', sessionId: 'session-123', projectId, @@ -268,8 +261,6 @@ describe('incomingEvent', () => { 'request-id': '123', }, projectId, - currentDeviceId: '', - previousDeviceId: '', deviceId: '', sessionId: '', uaInfo: uaInfoServer, @@ -374,8 +365,6 @@ describe('incomingEvent', () => { 'request-id': '123', }, projectId, - currentDeviceId: '', - previousDeviceId: '', deviceId: '', sessionId: '', uaInfo: uaInfoServer, diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts index dcf427fe..01008c1f 100644 --- a/apps/worker/src/utils/session-handler.ts +++ b/apps/worker/src/utils/session-handler.ts @@ -1,5 +1,4 @@ -import { getTime } from '@openpanel/common'; -import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db'; +import type { IServiceCreateEventPayload } from '@openpanel/db'; import { type EventsQueuePayloadCreateSessionEnd, sessionsQueue, @@ -12,7 +11,7 @@ export const SESSION_TIMEOUT = 1000 * 60 * 30; const getSessionEndJobId = (projectId: string, deviceId: string) => `sessionEnd:${projectId}:${deviceId}`; -export async function createSessionEndJob({ +export function createSessionEndJob({ payload, }: { payload: IServiceCreateEventPayload; @@ -31,27 +30,21 @@ export async function createSessionEndJob({ type: 'exponential', delay: 200, }, - }, + } ); } export async function getSessionEnd({ projectId, - currentDeviceId, - previousDeviceId, deviceId, profileId, }: { projectId: string; - currentDeviceId: string; - previousDeviceId: string; deviceId: string; profileId: string; }) { const sessionEnd = await getSessionEndJob({ projectId, - currentDeviceId, - previousDeviceId, deviceId, }); @@ -82,8 +75,6 @@ export async function getSessionEnd({ export async function getSessionEndJob(args: { projectId: string; - currentDeviceId: string; - previousDeviceId: string; deviceId: string; retryCount?: number; }): Promise<{ @@ -98,7 +89,7 @@ export async function getSessionEndJob(args: { async function handleJobStates( job: Job, - deviceId: string, + deviceId: string ): Promise<{ deviceId: string; job: Job; @@ -134,28 +125,9 @@ export async function getSessionEndJob(args: { return null; } - // TODO: Remove this when migrated to deviceId - if (args.currentDeviceId && args.previousDeviceId) { - // Check current device job - const currentJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.currentDeviceId), - ); - if (currentJob) { - return await handleJobStates(currentJob, args.currentDeviceId); - } - - // Check previous device job - const previousJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.previousDeviceId), - ); - if (previousJob) { - return await handleJobStates(previousJob, args.previousDeviceId); - } - } - // Check current device job const currentJob = await sessionsQueue.getJob( - getSessionEndJobId(args.projectId, args.deviceId), + getSessionEndJobId(args.projectId, args.deviceId) ); if (currentJob) { return await handleJobStates(currentJob, args.deviceId); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index c4737db3..4ba92b6a 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -1,5 +1,3 @@ -import { Queue, QueueEvents } from 'bullmq'; - import { createHash } from 'node:crypto'; import type { IServiceCreateEventPayload, @@ -8,12 +6,13 @@ import type { } from '@openpanel/db'; import { createLogger } from '@openpanel/logger'; import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; +import { Queue, QueueEvents } from 'bullmq'; import { Queue as GroupQueue } from 'groupmq'; import type { ITrackPayload } from '../../validation'; export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt( process.env.EVENTS_GROUP_QUEUES_SHARDS || '1', - 10, + 10 ); export const getQueueName = (name: string) => @@ -65,8 +64,6 @@ export interface EventsQueuePayloadIncomingEvent { latitude: number | undefined; }; headers: Record; - currentDeviceId: string; // TODO: Remove - previousDeviceId: string; // TODO: Remove deviceId: string; sessionId: string; }; @@ -154,12 +151,12 @@ export type CronQueueType = CronQueuePayload['type']; const orderingDelayMs = Number.parseInt( process.env.ORDERING_DELAY_MS || '100', - 10, + 10 ); const autoBatchMaxWaitMs = Number.parseInt( process.env.AUTO_BATCH_MAX_WAIT_MS || '0', - 10, + 10 ); const autoBatchSize = Number.parseInt(process.env.AUTO_BATCH_SIZE || '0', 10); @@ -170,12 +167,12 @@ export const eventsGroupQueues = Array.from({ new GroupQueue({ logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined, namespace: getQueueName( - list.length === 1 ? 'group_events' : `group_events_${index}`, + list.length === 1 ? 'group_events' : `group_events_${index}` ), redis: getRedisGroupQueue(), - keepCompleted: 1_000, + keepCompleted: 1000, keepFailed: 10_000, - orderingDelayMs: orderingDelayMs, + orderingDelayMs, autoBatch: autoBatchMaxWaitMs && autoBatchSize ? { @@ -183,7 +180,7 @@ export const eventsGroupQueues = Array.from({ size: autoBatchSize, } : undefined, - }), + }) ); export const getEventsGroupQueueShard = (groupId: string) => { @@ -202,7 +199,7 @@ export const sessionsQueue = new Queue( defaultJobOptions: { removeOnComplete: 10, }, - }, + } ); export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), { connection: getRedisQueue(), @@ -236,7 +233,7 @@ export const notificationQueue = new Queue( defaultJobOptions: { removeOnComplete: 10, }, - }, + } ); export type ImportQueuePayload = { @@ -254,7 +251,7 @@ export const importQueue = new Queue( removeOnComplete: 10, removeOnFail: 50, }, - }, + } ); export type InsightsQueuePayloadProject = { @@ -269,5 +266,5 @@ export const insightsQueue = new Queue( defaultJobOptions: { removeOnComplete: 100, }, - }, + } );