fix: funnel notifications

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-27 10:24:45 +01:00
parent 928c44ef6a
commit 9c6c7bb037
2 changed files with 82 additions and 45 deletions

View File

@@ -1,24 +1,22 @@
import type { Job } from 'bullmq';
import { logger as baseLogger } from '@/utils/logger';
import { import {
type IClickhouseSession,
type IServiceCreateEventPayload,
type IServiceEvent,
TABLE_NAMES,
checkNotificationRulesForSessionEnd, checkNotificationRulesForSessionEnd,
convertClickhouseDateToJs, convertClickhouseDateToJs,
createEvent, createEvent,
eventBuffer,
formatClickhouseDate, formatClickhouseDate,
getEvents, getEvents,
getHasFunnelRules, getHasFunnelRules,
getNotificationRulesByProjectId, getNotificationRulesByProjectId,
type IClickhouseSession,
type IServiceCreateEventPayload,
type IServiceEvent,
profileBackfillBuffer, profileBackfillBuffer,
sessionBuffer, sessionBuffer,
TABLE_NAMES,
transformSessionToEvent, transformSessionToEvent,
} from '@openpanel/db'; } from '@openpanel/db';
import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue';
import type { Job } from 'bullmq';
import { logger as baseLogger } from '@/utils/logger';
const MAX_SESSION_EVENTS = 500; const MAX_SESSION_EVENTS = 500;
@@ -39,7 +37,7 @@ async function getSessionEvents({
WHERE WHERE
session_id = '${sessionId}' session_id = '${sessionId}'
AND project_id = '${projectId}' AND project_id = '${projectId}'
AND created_at BETWEEN '${formatClickhouseDate(startAt)}' AND '${formatClickhouseDate(endAt)}' AND created_at BETWEEN '${formatClickhouseDate(new Date(startAt.getTime() - 1000))}' AND '${formatClickhouseDate(new Date(endAt.getTime() + 1000))}'
ORDER BY created_at DESC LIMIT ${MAX_SESSION_EVENTS}; ORDER BY created_at DESC LIMIT ${MAX_SESSION_EVENTS};
`; `;
@@ -58,12 +56,12 @@ async function getSessionEvents({
.flatMap((event) => (event ? [event] : [])) .flatMap((event) => (event ? [event] : []))
.sort( .sort(
(a, b) => (a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()
); );
} }
export async function createSessionEnd( export async function createSessionEnd(
job: Job<EventsQueuePayloadCreateSessionEnd>, job: Job<EventsQueuePayloadCreateSessionEnd>
) { ) {
const { payload } = job.data; const { payload } = job.data;
const logger = baseLogger.child({ const logger = baseLogger.child({
@@ -92,18 +90,24 @@ export async function createSessionEnd(
}); });
} }
const profileId = session.profile_id || payload.profileId const profileId = session.profile_id || payload.profileId;
if ( if (
profileId !== session.device_id profileId !== session.device_id &&
&& process.env.EXPERIMENTAL_PROFILE_BACKFILL === '1' process.env.EXPERIMENTAL_PROFILE_BACKFILL === '1'
) { ) {
const runOnProjects = process.env.EXPERIMENTAL_PROFILE_BACKFILL_PROJECTS?.split(',').filter(Boolean) ?? [] const runOnProjects =
if(runOnProjects.length === 0 || runOnProjects.includes(payload.projectId)) { process.env.EXPERIMENTAL_PROFILE_BACKFILL_PROJECTS?.split(',').filter(
Boolean
) ?? [];
if (
runOnProjects.length === 0 ||
runOnProjects.includes(payload.projectId)
) {
await profileBackfillBuffer.add({ await profileBackfillBuffer.add({
projectId: payload.projectId, projectId: payload.projectId,
sessionId: payload.sessionId, sessionId: payload.sessionId,
profileId: profileId, profileId,
}); });
} }
} }
@@ -119,9 +123,9 @@ export async function createSessionEnd(
duration: session.duration ?? 0, duration: session.duration ?? 0,
path: session.exit_path ?? '', path: session.exit_path ?? '',
createdAt: new Date( createdAt: new Date(
convertClickhouseDateToJs(session.ended_at).getTime() + 1000, convertClickhouseDateToJs(session.ended_at).getTime() + 1000
), ),
profileId: profileId, profileId,
}); });
} }
@@ -133,7 +137,7 @@ async function handleSessionEndNotifications({
payload: IServiceCreateEventPayload; payload: IServiceCreateEventPayload;
}) { }) {
const notificationRules = await getNotificationRulesByProjectId( const notificationRules = await getNotificationRulesByProjectId(
payload.projectId, payload.projectId
); );
const hasFunnelRules = getHasFunnelRules(notificationRules); const hasFunnelRules = getHasFunnelRules(notificationRules);
const isEventCountReasonable = const isEventCountReasonable =
@@ -143,8 +147,8 @@ async function handleSessionEndNotifications({
const events = await getSessionEvents({ const events = await getSessionEvents({
projectId: payload.projectId, projectId: payload.projectId,
sessionId: payload.sessionId, sessionId: payload.sessionId,
startAt: new Date(session.created_at), startAt: convertClickhouseDateToJs(session.created_at),
endAt: new Date(session.ended_at), endAt: convertClickhouseDateToJs(session.ended_at),
}); });
if (events.length > 0) { if (events.length > 0) {

View File

@@ -4,10 +4,10 @@ import { cacheable } from '@openpanel/redis';
import type { IChartEvent, IChartEventFilter } from '@openpanel/validation'; import type { IChartEvent, IChartEventFilter } from '@openpanel/validation';
import { pathOr } from 'ramda'; import { pathOr } from 'ramda';
import { import {
db,
type Integration, type Integration,
type Notification, type Notification,
Prisma, type Prisma,
db,
} from '../prisma-client'; } from '../prisma-client';
import type { import type {
IServiceCreateEventPayload, IServiceCreateEventPayload,
@@ -90,7 +90,7 @@ export const getNotificationRulesByProjectId = cacheable(
}, },
}); });
}, },
60 * 24, 60 * 24
); );
function getIntegration(integrationId: string | null) { function getIntegration(integrationId: string | null) {
@@ -117,12 +117,30 @@ function getIntegration(integrationId: string | null) {
}; };
} }
function stripNullChars<T>(value: T): T {
if (typeof value === 'string') {
return value.split('\u0000').join('') as T;
}
if (value instanceof Date) {
return value;
}
if (Array.isArray(value)) {
return value.map(stripNullChars) as T;
}
if (value !== null && typeof value === 'object') {
return Object.fromEntries(
Object.entries(value).map(([k, v]) => [k, stripNullChars(v)])
) as T;
}
return value;
}
export async function createNotification(notification: ICreateNotification) { export async function createNotification(notification: ICreateNotification) {
const data: Prisma.NotificationUncheckedCreateInput = { const data: Prisma.NotificationUncheckedCreateInput = {
title: notification.title, title: notification.title,
message: notification.message, message: notification.message,
projectId: notification.projectId, projectId: notification.projectId,
payload: notification.payload || Prisma.DbNull, payload: stripNullChars(notification.payload) || undefined,
...getIntegration(notification.integrationId), ...getIntegration(notification.integrationId),
notificationRuleId: notification.notificationRuleId, notificationRuleId: notification.notificationRuleId,
}; };
@@ -138,7 +156,7 @@ export async function createNotification(notification: ICreateNotification) {
} }
export function triggerNotification( export function triggerNotification(
notification: Prisma.NotificationUncheckedCreateInput, notification: Prisma.NotificationUncheckedCreateInput
) { ) {
return notificationQueue.add('sendNotification', { return notificationQueue.add('sendNotification', {
type: 'sendNotification', type: 'sendNotification',
@@ -150,12 +168,14 @@ export function triggerNotification(
function matchEventFilters( function matchEventFilters(
payload: IServiceCreateEventPayload, payload: IServiceCreateEventPayload,
filters: IChartEventFilter[], filters: IChartEventFilter[]
) { ) {
return filters.every((filter) => { return filters.every((filter) => {
const { name, value, operator } = filter; const { name, value, operator } = filter;
if (value.length === 0) return true; if (value.length === 0) {
return true;
}
if (name === 'has_profile') { if (name === 'has_profile') {
if (value.includes('true')) { if (value.includes('true')) {
@@ -214,7 +234,7 @@ function matchEventFilters(
export function matchEvent( export function matchEvent(
payload: IServiceCreateEventPayload, payload: IServiceCreateEventPayload,
chartEvent: IChartEvent, chartEvent: IChartEvent
) { ) {
if (payload.name !== chartEvent.name && chartEvent.name !== '*') { if (payload.name !== chartEvent.name && chartEvent.name !== '*') {
return false; return false;
@@ -234,7 +254,9 @@ function notificationTemplateEvent({
payload: IServiceCreateEventPayload; payload: IServiceCreateEventPayload;
rule: INotificationRuleCached; rule: INotificationRuleCached;
}) { }) {
if (!rule.template) return `You received a new "${payload.name}" event`; if (!rule.template) {
return `You received a new "${payload.name}" event`;
}
let template = rule.template let template = rule.template
.replaceAll('$EVENT_NAME', payload.name) .replaceAll('$EVENT_NAME', payload.name)
.replaceAll('$RULE_NAME', rule.name) .replaceAll('$RULE_NAME', rule.name)
@@ -249,7 +271,7 @@ function notificationTemplateEvent({
if (value) { if (value) {
template = template.replaceAll( template = template.replaceAll(
match, match,
typeof value === 'object' ? JSON.stringify(value) : value, typeof value === 'object' ? JSON.stringify(value) : value
); );
} }
} }
@@ -264,14 +286,17 @@ function notificationTemplateFunnel({
events: IServiceEvent[]; events: IServiceEvent[];
rule: INotificationRuleCached; rule: INotificationRuleCached;
}) { }) {
if (!rule.template) return `Funnel "${rule.name}" completed`; if (!rule.template) {
return `Funnel "${rule.name}" completed`;
}
return rule.template return rule.template
.replaceAll('$EVENT_NAME', events.map((e) => e.name).join(' -> ')) .replaceAll('$EVENT_NAME', events.map((e) => e.name).join(' -> '))
.replaceAll('$RULE_NAME', rule.name); .replaceAll('$RULE_NAME', rule.name);
} }
const PROFILE_TEMPLATE_REGEX = /{{profile\.[^}]*}}/;
export async function checkNotificationRulesForEvent( export async function checkNotificationRulesForEvent(
payload: IServiceCreateEventPayload, payload: IServiceCreateEventPayload
) { ) {
const project = await getProjectByIdCached(payload.projectId); const project = await getProjectByIdCached(payload.projectId);
const rules = await getNotificationRulesByProjectId(payload.projectId); const rules = await getNotificationRulesByProjectId(payload.projectId);
@@ -280,7 +305,7 @@ export async function checkNotificationRulesForEvent(
// so we can use it in the template // so we can use it in the template
if ( if (
payload.profileId && payload.profileId &&
rules.some((rule) => rule.template?.match(/{{profile\.[^}]*}}/)) rules.some((rule) => rule.template?.match(PROFILE_TEMPLATE_REGEX))
) { ) {
const profile = await getProfileById(payload.profileId, payload.projectId); const profile = await getProfileById(payload.profileId, payload.projectId);
if (profile) { if (profile) {
@@ -317,7 +342,7 @@ export async function checkNotificationRulesForEvent(
...notification, ...notification,
integrationId: integration.id, integrationId: integration.id,
notificationRuleId: rule.id, notificationRuleId: rule.id,
}), })
); );
if (rule.sendToApp) { if (rule.sendToApp) {
@@ -326,7 +351,7 @@ export async function checkNotificationRulesForEvent(
...notification, ...notification,
integrationId: APP_NOTIFICATION_INTEGRATION_ID, integrationId: APP_NOTIFICATION_INTEGRATION_ID,
notificationRuleId: rule.id, notificationRuleId: rule.id,
}), })
); );
} }
@@ -336,13 +361,15 @@ export async function checkNotificationRulesForEvent(
...notification, ...notification,
integrationId: EMAIL_NOTIFICATION_INTEGRATION_ID, integrationId: EMAIL_NOTIFICATION_INTEGRATION_ID,
notificationRuleId: rule.id, notificationRuleId: rule.id,
}), })
); );
} }
return promises; return promises;
} }
}),
return [];
})
); );
} }
@@ -358,13 +385,15 @@ export function getFunnelRules(rules: INotificationRuleCached[]) {
} }
export async function checkNotificationRulesForSessionEnd( export async function checkNotificationRulesForSessionEnd(
events: IServiceEvent[], events: IServiceEvent[]
) { ) {
const sortedEvents = events.sort( const sortedEvents = events.sort(
(a, b) => a.createdAt.getTime() - b.createdAt.getTime(), (a, b) => a.createdAt.getTime() - b.createdAt.getTime()
); );
const projectId = sortedEvents[0]?.projectId; const projectId = sortedEvents[0]?.projectId;
if (!projectId) return null; if (!projectId) {
return null;
}
const [project, rules] = await Promise.all([ const [project, rules] = await Promise.all([
getProjectByIdCached(projectId), getProjectByIdCached(projectId),
@@ -380,12 +409,16 @@ export async function checkNotificationRulesForSessionEnd(
if (matchEvent(event, rule.config.events[funnelIndex]!)) { if (matchEvent(event, rule.config.events[funnelIndex]!)) {
matchedEvents.push(event); matchedEvents.push(event);
funnelIndex++; funnelIndex++;
if (funnelIndex === rule.config.events.length) break; if (funnelIndex === rule.config.events.length) {
break;
}
} }
} }
// If funnel not completed, skip this rule // If funnel not completed, skip this rule
if (funnelIndex < rule.config.events.length) return []; if (funnelIndex < rule.config.events.length) {
return [];
}
// Create notification object // Create notification object
const notification = { const notification = {
@@ -405,7 +438,7 @@ export async function checkNotificationRulesForSessionEnd(
...notification, ...notification,
integrationId: integration.id, integrationId: integration.id,
notificationRuleId: rule.id, notificationRuleId: rule.id,
}), })
), ),
...(rule.sendToApp ...(rule.sendToApp
? [ ? [