fix: add session end event for notification funnel

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-27 15:40:57 +01:00
parent efb50fafdb
commit f4602f8e56
2 changed files with 22 additions and 14 deletions

View File

@@ -12,6 +12,7 @@ import {
profileBackfillBuffer, profileBackfillBuffer,
sessionBuffer, sessionBuffer,
TABLE_NAMES, TABLE_NAMES,
transformEvent,
transformSessionToEvent, transformSessionToEvent,
} from '@openpanel/db'; } from '@openpanel/db';
import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue';
@@ -79,17 +80,6 @@ export async function createSessionEnd(
throw new Error('Session not found'); throw new Error('Session not found');
} }
try {
await handleSessionEndNotifications({
session,
payload,
});
} catch (error) {
logger.error('Creating notificatios for session end failed', {
error,
});
}
const profileId = session.profile_id || payload.profileId; const profileId = session.profile_id || payload.profileId;
if ( if (
@@ -113,7 +103,7 @@ export async function createSessionEnd(
} }
// Create session end event // Create session end event
return createEvent({ const { document: sessionEndEvent } = await createEvent({
...payload, ...payload,
properties: { properties: {
...payload.properties, ...payload.properties,
@@ -127,14 +117,30 @@ export async function createSessionEnd(
), ),
profileId, profileId,
}); });
try {
await handleSessionEndNotifications({
session,
payload,
sessionEndEvent: transformEvent(sessionEndEvent),
});
} catch (error) {
logger.error('Creating notificatios for session end failed', {
error,
});
}
return sessionEndEvent;
} }
async function handleSessionEndNotifications({ async function handleSessionEndNotifications({
session, session,
payload, payload,
sessionEndEvent,
}: { }: {
session: IClickhouseSession; session: IClickhouseSession;
payload: IServiceCreateEventPayload; payload: IServiceCreateEventPayload;
sessionEndEvent: IServiceEvent;
}) { }) {
const notificationRules = await getNotificationRulesByProjectId( const notificationRules = await getNotificationRulesByProjectId(
payload.projectId payload.projectId
@@ -152,7 +158,7 @@ async function handleSessionEndNotifications({
}); });
if (events.length > 0) { if (events.length > 0) {
await checkNotificationRulesForSessionEnd(events); await checkNotificationRulesForSessionEnd([...events, sessionEndEvent]);
} }
} }
} }

View File

@@ -69,7 +69,9 @@ describe('incomingEvent', () => {
}); });
it('should create a session start and an event', async () => { it('should create a session start and an event', async () => {
const spySessionsQueueAdd = vi.spyOn(sessionsQueue, 'add'); const spySessionsQueueAdd = vi
.spyOn(sessionsQueue, 'add')
.mockResolvedValue({} as Job);
const timestamp = new Date(); const timestamp = new Date();
// Mock job data // Mock job data
const jobData: EventsQueuePayloadIncomingEvent['payload'] = { const jobData: EventsQueuePayloadIncomingEvent['payload'] = {