diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index ea0eb2f0..067d8b62 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -1,24 +1,22 @@ -import type { Job } from 'bullmq'; - -import { logger as baseLogger } from '@/utils/logger'; import { - type IClickhouseSession, - type IServiceCreateEventPayload, - type IServiceEvent, - TABLE_NAMES, checkNotificationRulesForSessionEnd, convertClickhouseDateToJs, createEvent, - eventBuffer, formatClickhouseDate, getEvents, getHasFunnelRules, getNotificationRulesByProjectId, + type IClickhouseSession, + type IServiceCreateEventPayload, + type IServiceEvent, profileBackfillBuffer, sessionBuffer, + TABLE_NAMES, transformSessionToEvent, } from '@openpanel/db'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; +import type { Job } from 'bullmq'; +import { logger as baseLogger } from '@/utils/logger'; const MAX_SESSION_EVENTS = 500; @@ -39,7 +37,7 @@ async function getSessionEvents({ WHERE session_id = '${sessionId}' AND project_id = '${projectId}' - AND created_at BETWEEN '${formatClickhouseDate(startAt)}' AND '${formatClickhouseDate(endAt)}' + AND created_at BETWEEN '${formatClickhouseDate(new Date(startAt.getTime() - 1000))}' AND '${formatClickhouseDate(new Date(endAt.getTime() + 1000))}' ORDER BY created_at DESC LIMIT ${MAX_SESSION_EVENTS}; `; @@ -58,12 +56,12 @@ async function getSessionEvents({ .flatMap((event) => (event ? [event] : [])) .sort( (a, b) => - new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), + new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime() ); } export async function createSessionEnd( - job: Job, + job: Job ) { const { payload } = job.data; const logger = baseLogger.child({ @@ -92,18 +90,24 @@ export async function createSessionEnd( }); } - const profileId = session.profile_id || payload.profileId + const profileId = session.profile_id || payload.profileId; if ( - profileId !== session.device_id - && process.env.EXPERIMENTAL_PROFILE_BACKFILL === '1' + profileId !== session.device_id && + process.env.EXPERIMENTAL_PROFILE_BACKFILL === '1' ) { - const runOnProjects = process.env.EXPERIMENTAL_PROFILE_BACKFILL_PROJECTS?.split(',').filter(Boolean) ?? [] - if(runOnProjects.length === 0 || runOnProjects.includes(payload.projectId)) { + const runOnProjects = + process.env.EXPERIMENTAL_PROFILE_BACKFILL_PROJECTS?.split(',').filter( + Boolean + ) ?? []; + if ( + runOnProjects.length === 0 || + runOnProjects.includes(payload.projectId) + ) { await profileBackfillBuffer.add({ projectId: payload.projectId, sessionId: payload.sessionId, - profileId: profileId, + profileId, }); } } @@ -119,9 +123,9 @@ export async function createSessionEnd( duration: session.duration ?? 0, path: session.exit_path ?? '', createdAt: new Date( - convertClickhouseDateToJs(session.ended_at).getTime() + 1000, + convertClickhouseDateToJs(session.ended_at).getTime() + 1000 ), - profileId: profileId, + profileId, }); } @@ -133,7 +137,7 @@ async function handleSessionEndNotifications({ payload: IServiceCreateEventPayload; }) { const notificationRules = await getNotificationRulesByProjectId( - payload.projectId, + payload.projectId ); const hasFunnelRules = getHasFunnelRules(notificationRules); const isEventCountReasonable = @@ -143,8 +147,8 @@ async function handleSessionEndNotifications({ const events = await getSessionEvents({ projectId: payload.projectId, sessionId: payload.sessionId, - startAt: new Date(session.created_at), - endAt: new Date(session.ended_at), + startAt: convertClickhouseDateToJs(session.created_at), + endAt: convertClickhouseDateToJs(session.ended_at), }); if (events.length > 0) { diff --git a/packages/db/src/services/notification.service.ts b/packages/db/src/services/notification.service.ts index f935b0e2..57d988f7 100644 --- a/packages/db/src/services/notification.service.ts +++ b/packages/db/src/services/notification.service.ts @@ -4,10 +4,10 @@ import { cacheable } from '@openpanel/redis'; import type { IChartEvent, IChartEventFilter } from '@openpanel/validation'; import { pathOr } from 'ramda'; import { + db, type Integration, type Notification, - Prisma, - db, + type Prisma, } from '../prisma-client'; import type { IServiceCreateEventPayload, @@ -90,7 +90,7 @@ export const getNotificationRulesByProjectId = cacheable( }, }); }, - 60 * 24, + 60 * 24 ); function getIntegration(integrationId: string | null) { @@ -117,12 +117,30 @@ function getIntegration(integrationId: string | null) { }; } +function stripNullChars(value: T): T { + if (typeof value === 'string') { + return value.split('\u0000').join('') as T; + } + if (value instanceof Date) { + return value; + } + if (Array.isArray(value)) { + return value.map(stripNullChars) as T; + } + if (value !== null && typeof value === 'object') { + return Object.fromEntries( + Object.entries(value).map(([k, v]) => [k, stripNullChars(v)]) + ) as T; + } + return value; +} + export async function createNotification(notification: ICreateNotification) { const data: Prisma.NotificationUncheckedCreateInput = { title: notification.title, message: notification.message, projectId: notification.projectId, - payload: notification.payload || Prisma.DbNull, + payload: stripNullChars(notification.payload) || undefined, ...getIntegration(notification.integrationId), notificationRuleId: notification.notificationRuleId, }; @@ -138,7 +156,7 @@ export async function createNotification(notification: ICreateNotification) { } export function triggerNotification( - notification: Prisma.NotificationUncheckedCreateInput, + notification: Prisma.NotificationUncheckedCreateInput ) { return notificationQueue.add('sendNotification', { type: 'sendNotification', @@ -150,12 +168,14 @@ export function triggerNotification( function matchEventFilters( payload: IServiceCreateEventPayload, - filters: IChartEventFilter[], + filters: IChartEventFilter[] ) { return filters.every((filter) => { const { name, value, operator } = filter; - if (value.length === 0) return true; + if (value.length === 0) { + return true; + } if (name === 'has_profile') { if (value.includes('true')) { @@ -214,7 +234,7 @@ function matchEventFilters( export function matchEvent( payload: IServiceCreateEventPayload, - chartEvent: IChartEvent, + chartEvent: IChartEvent ) { if (payload.name !== chartEvent.name && chartEvent.name !== '*') { return false; @@ -234,7 +254,9 @@ function notificationTemplateEvent({ payload: IServiceCreateEventPayload; rule: INotificationRuleCached; }) { - if (!rule.template) return `You received a new "${payload.name}" event`; + if (!rule.template) { + return `You received a new "${payload.name}" event`; + } let template = rule.template .replaceAll('$EVENT_NAME', payload.name) .replaceAll('$RULE_NAME', rule.name) @@ -249,7 +271,7 @@ function notificationTemplateEvent({ if (value) { template = template.replaceAll( match, - typeof value === 'object' ? JSON.stringify(value) : value, + typeof value === 'object' ? JSON.stringify(value) : value ); } } @@ -264,14 +286,17 @@ function notificationTemplateFunnel({ events: IServiceEvent[]; rule: INotificationRuleCached; }) { - if (!rule.template) return `Funnel "${rule.name}" completed`; + if (!rule.template) { + return `Funnel "${rule.name}" completed`; + } return rule.template .replaceAll('$EVENT_NAME', events.map((e) => e.name).join(' -> ')) .replaceAll('$RULE_NAME', rule.name); } +const PROFILE_TEMPLATE_REGEX = /{{profile\.[^}]*}}/; export async function checkNotificationRulesForEvent( - payload: IServiceCreateEventPayload, + payload: IServiceCreateEventPayload ) { const project = await getProjectByIdCached(payload.projectId); const rules = await getNotificationRulesByProjectId(payload.projectId); @@ -280,7 +305,7 @@ export async function checkNotificationRulesForEvent( // so we can use it in the template if ( payload.profileId && - rules.some((rule) => rule.template?.match(/{{profile\.[^}]*}}/)) + rules.some((rule) => rule.template?.match(PROFILE_TEMPLATE_REGEX)) ) { const profile = await getProfileById(payload.profileId, payload.projectId); if (profile) { @@ -317,7 +342,7 @@ export async function checkNotificationRulesForEvent( ...notification, integrationId: integration.id, notificationRuleId: rule.id, - }), + }) ); if (rule.sendToApp) { @@ -326,7 +351,7 @@ export async function checkNotificationRulesForEvent( ...notification, integrationId: APP_NOTIFICATION_INTEGRATION_ID, notificationRuleId: rule.id, - }), + }) ); } @@ -336,13 +361,15 @@ export async function checkNotificationRulesForEvent( ...notification, integrationId: EMAIL_NOTIFICATION_INTEGRATION_ID, notificationRuleId: rule.id, - }), + }) ); } return promises; } - }), + + return []; + }) ); } @@ -358,13 +385,15 @@ export function getFunnelRules(rules: INotificationRuleCached[]) { } export async function checkNotificationRulesForSessionEnd( - events: IServiceEvent[], + events: IServiceEvent[] ) { const sortedEvents = events.sort( - (a, b) => a.createdAt.getTime() - b.createdAt.getTime(), + (a, b) => a.createdAt.getTime() - b.createdAt.getTime() ); const projectId = sortedEvents[0]?.projectId; - if (!projectId) return null; + if (!projectId) { + return null; + } const [project, rules] = await Promise.all([ getProjectByIdCached(projectId), @@ -380,12 +409,16 @@ export async function checkNotificationRulesForSessionEnd( if (matchEvent(event, rule.config.events[funnelIndex]!)) { matchedEvents.push(event); funnelIndex++; - if (funnelIndex === rule.config.events.length) break; + if (funnelIndex === rule.config.events.length) { + break; + } } } // If funnel not completed, skip this rule - if (funnelIndex < rule.config.events.length) return []; + if (funnelIndex < rule.config.events.length) { + return []; + } // Create notification object const notification = { @@ -405,7 +438,7 @@ export async function checkNotificationRulesForSessionEnd( ...notification, integrationId: integration.id, notificationRuleId: rule.id, - }), + }) ), ...(rule.sendToApp ? [