From f4602f8e56c8742137ba87827529662a6e138141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 27 Feb 2026 15:40:57 +0100 Subject: [PATCH] fix: add session end event for notification funnel --- .../src/jobs/events.create-session-end.ts | 32 +++++++++++-------- .../src/jobs/events.incoming-events.test.ts | 4 ++- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index 067d8b62..95bcc1c8 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -12,6 +12,7 @@ import { profileBackfillBuffer, sessionBuffer, TABLE_NAMES, + transformEvent, transformSessionToEvent, } from '@openpanel/db'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; @@ -79,17 +80,6 @@ export async function createSessionEnd( throw new Error('Session not found'); } - try { - await handleSessionEndNotifications({ - session, - payload, - }); - } catch (error) { - logger.error('Creating notificatios for session end failed', { - error, - }); - } - const profileId = session.profile_id || payload.profileId; if ( @@ -113,7 +103,7 @@ export async function createSessionEnd( } // Create session end event - return createEvent({ + const { document: sessionEndEvent } = await createEvent({ ...payload, properties: { ...payload.properties, @@ -127,14 +117,30 @@ export async function createSessionEnd( ), profileId, }); + + try { + await handleSessionEndNotifications({ + session, + payload, + sessionEndEvent: transformEvent(sessionEndEvent), + }); + } catch (error) { + logger.error('Creating notificatios for session end failed', { + error, + }); + } + + return sessionEndEvent; } async function handleSessionEndNotifications({ session, payload, + sessionEndEvent, }: { session: IClickhouseSession; payload: IServiceCreateEventPayload; + sessionEndEvent: IServiceEvent; }) { const notificationRules = await getNotificationRulesByProjectId( payload.projectId @@ -152,7 +158,7 @@ async function handleSessionEndNotifications({ }); if (events.length > 0) { - await checkNotificationRulesForSessionEnd(events); + await checkNotificationRulesForSessionEnd([...events, sessionEndEvent]); } } } diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 121123eb..f1c24feb 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -69,7 +69,9 @@ describe('incomingEvent', () => { }); it('should create a session start and an event', async () => { - const spySessionsQueueAdd = vi.spyOn(sessionsQueue, 'add'); + const spySessionsQueueAdd = vi + .spyOn(sessionsQueue, 'add') + .mockResolvedValue({} as Job); const timestamp = new Date(); // Mock job data const jobData: EventsQueuePayloadIncomingEvent['payload'] = {