diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 87b78606..6834891f 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -55,15 +55,6 @@ export async function postEvent( return; } - const isScreenView = request.body.name === 'screen_view'; - // this will ensure that we don't have multiple events creating sessions - const LOCK_DURATION = 1000; - const locked = await getLock( - `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, - 'locked', - LOCK_DURATION, - ); - await eventsQueue.add( 'event', { @@ -79,7 +70,6 @@ export async function postEvent( geo, currentDeviceId, previousDeviceId, - priority: locked, }, }, { @@ -88,10 +78,6 @@ export async function postEvent( type: 'exponential', delay: 200, }, - // Prioritize 'screen_view' events by setting no delay - // This ensures that session starts are created from 'screen_view' events - // rather than other events, maintaining accurate session tracking - delay: isScreenView ? undefined : LOCK_DURATION - 100, }, ); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index b42ca334..ac926ee9 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -284,15 +284,6 @@ async function track({ timestamp: string; isTimestampFromThePast: boolean; }) { - const isScreenView = payload.name === 'screen_view'; - // this will ensure that we don't have multiple events creating sessions - const LOCK_DURATION = 1000; - const locked = await getLock( - `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, - 'locked', - LOCK_DURATION, - ); - await eventsQueue.add( 'event', { @@ -308,7 +299,6 @@ async function track({ geo, currentDeviceId, previousDeviceId, - priority: locked, }, }, { @@ -317,10 +307,6 @@ async function track({ type: 'exponential', delay: 200, }, - // Prioritize 'screen_view' events by setting no delay - // This ensures that session starts are created from 'screen_view' events - // rather than other events, maintaining accurate session tracking - delay: isScreenView ? undefined : LOCK_DURATION - 100, }, ); } diff --git a/apps/api/tsup.config.ts b/apps/api/tsup.config.ts index 39de3440..d1bf3ea8 100644 --- a/apps/api/tsup.config.ts +++ b/apps/api/tsup.config.ts @@ -11,7 +11,7 @@ const options: Options = { '@node-rs/argon2', 'bcrypt', ], - ignoreWatch: ['../../**/{.git,node_modules}/**'], + ignoreWatch: ['../../**/{.git,node_modules,dist}/**'], sourcemap: true, splitting: false, }; diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index c3718603..c8ca7f09 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -1,5 +1,4 @@ import type { Job } from 'bullmq'; -import { last } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; import { getTime } from '@openpanel/common'; @@ -9,61 +8,56 @@ import { checkNotificationRulesForSessionEnd, createEvent, eventBuffer, + formatClickhouseDate, getEvents, } from '@openpanel/db'; -import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; -async function getCompleteSession({ +// Grabs session_start and screen_views + the last occured event +async function getNecessarySessionEvents({ projectId, sessionId, - hoursInterval, + createdAt, }: { projectId: string; sessionId: string; - hoursInterval: number; -}) { + createdAt: Date; +}): Promise> { const sql = ` SELECT * FROM ${TABLE_NAMES.events} WHERE session_id = '${sessionId}' AND project_id = '${projectId}' - AND created_at > now() - interval ${hoursInterval} HOUR - ORDER BY created_at DESC + AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}' + AND ( + name IN ('screen_view', 'session_start') + OR created_at = ( + SELECT MAX(created_at) + FROM ${TABLE_NAMES.events} + WHERE session_id = '${sessionId}' + AND project_id = '${projectId}' + AND created_at >= '${formatClickhouseDate(new Date(new Date(createdAt).getTime() - 1000 * 60 * 5))}' + AND name NOT IN ('screen_view', 'session_start') + ) + ) + ORDER BY created_at DESC; `; - return getEvents(sql); -} - -async function getCompleteSessionWithSessionStart({ - projectId, - sessionId, - logger, -}: { - projectId: string; - sessionId: string; - logger: ILogger; -}): Promise> { - const intervals = [1, 6, 12, 24, 72]; - let intervalIndex = 0; - for (const hoursInterval of intervals) { - const events = await getCompleteSession({ + const [lastScreenView, eventsInDb] = await Promise.all([ + eventBuffer.getLastScreenView({ projectId, sessionId, - hoursInterval, - }); + }), + getEvents(sql), + ]); - if (events.find((event) => event.name === 'session_start')) { - return events; - } - - const nextHoursInterval = intervals[++intervalIndex]; - if (nextHoursInterval) { - logger.warn(`Checking last ${nextHoursInterval} hours for session_start`); - } - } - - return []; + // sort last inserted first + return [lastScreenView, ...eventsInDb] + .filter((event): event is IServiceEvent => !!event) + .sort( + (a, b) => + new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), + ); } export async function createSessionEnd( @@ -77,56 +71,27 @@ export async function createSessionEnd( const payload = job.data.payload; - const [lastScreenView, eventsInDb] = await Promise.all([ - eventBuffer.getLastScreenView({ - projectId: payload.projectId, - sessionId: payload.sessionId, - }), - getCompleteSessionWithSessionStart({ - projectId: payload.projectId, - sessionId: payload.sessionId, - logger, - }), - ]); + const events = await getNecessarySessionEvents({ + projectId: payload.projectId, + sessionId: payload.sessionId, + createdAt: payload.createdAt, + }); - // sort last inserted first - const events = [lastScreenView, ...eventsInDb] - .filter((event): event is IServiceEvent => !!event) - .sort( - (a, b) => - new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), - ); - - const sessionDuration = events.reduce((acc, event) => { - return acc + event.duration; - }, 0); - - let sessionStart = events.find((event) => event.name === 'session_start'); - const lastEvent = events[0]; + const sessionStart = events.find((event) => event.name === 'session_start'); const screenViews = events.filter((event) => event.name === 'screen_view'); + const lastEvent = events[0]; if (!sessionStart) { - const firstScreenView = last(screenViews); - - if (!firstScreenView) { - throw new Error('Could not found session_start or any screen_view'); - } - - logger.warn('Creating session_start since it was not found'); - - sessionStart = { - ...firstScreenView, - name: 'session_start', - createdAt: new Date(getTime(firstScreenView.createdAt) - 100), - }; - - await createEvent(sessionStart); + throw new Error('No session_start found'); } if (!lastEvent) { throw new Error('No last event found'); } + const sessionDuration = + lastEvent.createdAt.getTime() - sessionStart.createdAt.getTime(); + await checkNotificationRulesForSessionEnd(events); logger.info('Creating session_end', { @@ -135,7 +100,6 @@ export async function createSessionEnd( screenViews, sessionDuration, events, - lastScreenView: lastScreenView ? lastScreenView : 'none', }); return createEvent({ diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 9825e334..adc2272f 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,9 +1,10 @@ -import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer'; -import type { Job } from 'bullmq'; -import { omit } from 'ramda'; - import { logger as baseLogger } from '@/utils/logger'; -import { createSessionEnd, getSessionEnd } from '@/utils/session-handler'; +import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer'; +import { + createSessionEndJob, + createSessionStart, + getSessionEnd, +} from '@/utils/session-handler'; import { isSameDomain, parsePath } from '@openpanel/common'; import { parseUserAgent } from '@openpanel/common/server'; import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; @@ -14,7 +15,11 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import { getLock } from '@openpanel/redis'; +import { DelayedError, type Job } from 'bullmq'; +import { omit } from 'ramda'; import * as R from 'ramda'; +import { v4 as uuid } from 'uuid'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; @@ -29,16 +34,18 @@ async function createEventAndNotify( jobData: Job['data']['payload'], logger: ILogger, ) { - await checkNotificationRulesForEvent(payload).catch((e) => { - logger.error('Error checking notification rules', { error: e }); - }); - logger.info('Creating event', { event: payload, jobData }); - - return createEvent(payload); + const [event] = await Promise.all([ + createEvent(payload), + checkNotificationRulesForEvent(payload), + ]); + return event; } -export async function incomingEvent(job: Job) { +export async function incomingEvent( + job: Job, + token?: string, +) { const { geo, event: body, @@ -46,7 +53,6 @@ export async function incomingEvent(job: Job) { projectId, currentDeviceId, previousDeviceId, - priority, } = job.data.payload; const properties = body.properties ?? {}; const reqId = headers['request-id'] ?? 'unknown'; @@ -131,32 +137,50 @@ export async function incomingEvent(job: Job) { } const sessionEnd = await getSessionEnd({ - priority, projectId, currentDeviceId, previousDeviceId, profileId, }); - const lastScreenView = await eventBuffer.getLastScreenView({ - projectId, - sessionId: sessionEnd.payload.sessionId, - }); + const lastScreenView = sessionEnd + ? await eventBuffer.getLastScreenView({ + projectId, + sessionId: sessionEnd.sessionId, + }) + : null; const payload: IServiceCreateEventPayload = merge(baseEvent, { - deviceId: sessionEnd.payload.deviceId, - sessionId: sessionEnd.payload.sessionId, - referrer: sessionEnd.payload?.referrer, - referrerName: sessionEnd.payload?.referrerName, - referrerType: sessionEnd.payload?.referrerType, + deviceId: sessionEnd?.deviceId ?? currentDeviceId, + sessionId: sessionEnd?.sessionId ?? uuid(), + referrer: sessionEnd?.referrer ?? baseEvent.referrer, + referrerName: sessionEnd?.referrerName ?? baseEvent.referrerName, + referrerType: sessionEnd?.referrerType ?? baseEvent.referrerType, // if the path is not set, use the last screen view path path: baseEvent.path || lastScreenView?.path || '', origin: baseEvent.origin || lastScreenView?.origin || '', } as Partial) as IServiceCreateEventPayload; - if (sessionEnd.notFound) { - await createSessionEnd({ payload }); + if (!sessionEnd) { + // Too avoid several created sessions we just throw if a lock exists + // This will than retry the job + const lock = await getLock( + `create-session-end:${currentDeviceId}`, + 'locked', + 1000, + ); + + if (!lock) { + logger.warn('Move incoming event to delayed'); + await job.moveToDelayed(Date.now() + 50, token); + throw new DelayedError(); + } + await createSessionStart({ payload }); } - return createEventAndNotify(payload, job.data.payload, logger); + const event = await createEventAndNotify(payload, job.data.payload, logger); + + await createSessionEndJob({ payload }); + + return event; } diff --git a/apps/worker/src/jobs/events.ts b/apps/worker/src/jobs/events.ts index ec573872..51298e29 100644 --- a/apps/worker/src/jobs/events.ts +++ b/apps/worker/src/jobs/events.ts @@ -7,6 +7,9 @@ import type { import { incomingEvent } from './events.incoming-event'; -export async function eventsJob(job: Job) { - return await incomingEvent(job as Job); +export async function eventsJob(job: Job, token?: string) { + return await incomingEvent( + job as Job, + token, + ); } diff --git a/apps/worker/src/utils/session-handler.ts b/apps/worker/src/utils/session-handler.ts index d09c7b68..acb858f7 100644 --- a/apps/worker/src/utils/session-handler.ts +++ b/apps/worker/src/utils/session-handler.ts @@ -3,22 +3,33 @@ import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db'; import { type EventsQueuePayloadCreateSessionEnd, sessionsQueue, - sessionsQueueEvents, } from '@openpanel/queue'; import type { Job } from 'bullmq'; -import { v4 as uuid } from 'uuid'; +import { logger } from './logger'; export const SESSION_TIMEOUT = 1000 * 60 * 30; const getSessionEndJobId = (projectId: string, deviceId: string) => `sessionEnd:${projectId}:${deviceId}`; -export async function createSessionEnd({ +export async function createSessionStart({ payload, }: { payload: IServiceCreateEventPayload; }) { - await sessionsQueue.add( + return createEvent({ + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }); +} + +export async function createSessionEndJob({ + payload, +}: { + payload: IServiceCreateEventPayload; +}) { + return sessionsQueue.add( 'session', { type: 'createSessionEnd', @@ -34,12 +45,6 @@ export async function createSessionEnd({ }, }, ); - - await createEvent({ - ...payload, - name: 'session_start', - createdAt: new Date(getTime(payload.createdAt) - 100), - }); } export async function getSessionEnd({ @@ -47,42 +52,33 @@ export async function getSessionEnd({ currentDeviceId, previousDeviceId, profileId, - priority, }: { projectId: string; currentDeviceId: string; previousDeviceId: string; profileId: string; - priority: boolean; }) { const sessionEnd = await getSessionEndJob({ - priority, projectId, currentDeviceId, previousDeviceId, }); - const sessionEndPayload = - sessionEnd?.job.data.payload || - ({ - sessionId: uuid(), - deviceId: currentDeviceId, - profileId, - projectId, - } satisfies EventsQueuePayloadCreateSessionEnd['payload']); - if (sessionEnd) { - // If for some reason we have a session end job that is not a createSessionEnd job - if (sessionEnd.job.data.type !== 'createSessionEnd') { - throw new Error('Invalid session end job'); + // Hack: if session end job just got created, we want to give it a chance to complete + // So the order is correct + if (sessionEnd.job.timestamp > Date.now() - 50) { + await new Promise((resolve) => setTimeout(resolve, 100)); } - // If the profile_id is set and it's different from the device_id, we need to update the profile_id - if ( - sessionEnd.job.data.payload.profileId !== profileId && + const existingSessionIsAnonymous = sessionEnd.job.data.payload.profileId === - sessionEnd.job.data.payload.deviceId - ) { + sessionEnd.job.data.payload.deviceId; + + const eventIsIdentified = + sessionEnd.job.data.payload.profileId !== profileId; + + if (existingSessionIsAnonymous && eventIsIdentified) { await sessionEnd.job.updateData({ ...sessionEnd.job.data, payload: { @@ -93,25 +89,22 @@ export async function getSessionEnd({ } await sessionEnd.job.changeDelay(SESSION_TIMEOUT); + return sessionEnd.job.data.payload; } - return { - payload: sessionEndPayload, - notFound: !sessionEnd, - }; + return null; } export async function getSessionEndJob(args: { projectId: string; currentDeviceId: string; previousDeviceId: string; - priority: boolean; retryCount?: number; }): Promise<{ deviceId: string; job: Job; } | null> { - const { priority, retryCount = 0 } = args; + const { retryCount = 0 } = args; if (retryCount >= 6) { throw new Error('Failed to get session end'); @@ -125,46 +118,32 @@ export async function getSessionEndJob(args: { job: Job; } | null> { const state = await job.getState(); - if (state === 'delayed') { + if (state !== 'delayed') { + logger.info(`[session-handler] Session end job is in "${state}" state`, { + state, + retryCount, + jobTimestamp: new Date(job.timestamp).toISOString(), + jobDelta: Date.now() - job.timestamp, + jobId: job.id, + reqId: job.data.payload.properties?.__reqId ?? 'unknown', + payload: job.data.payload, + }); + } + + if (state === 'delayed' || state === 'waiting') { return { deviceId, job }; } - if (state === 'failed') { - await job.retry(); - await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); + if (state === 'active') { + await new Promise((resolve) => setTimeout(resolve, 100)); return getSessionEndJob({ ...args, - priority, - retryCount, + retryCount: retryCount + 1, }); } if (state === 'completed') { await job.remove(); - return getSessionEndJob({ - ...args, - priority, - retryCount, - }); - } - - if (state === 'active' || state === 'waiting') { - await job.waitUntilFinished(sessionsQueueEvents, 1000 * 10); - return getSessionEndJob({ - ...args, - priority, - retryCount, - }); - } - - // Shady state here, just remove it and retry - if (state === 'unknown') { - await job.remove(); - return getSessionEndJob({ - ...args, - priority, - retryCount, - }); } return null; @@ -175,8 +154,7 @@ export async function getSessionEndJob(args: { getSessionEndJobId(args.projectId, args.currentDeviceId), ); if (currentJob) { - const res = await handleJobStates(currentJob, args.currentDeviceId); - if (res) return res; + return await handleJobStates(currentJob, args.currentDeviceId); } // Check previous device job @@ -184,15 +162,7 @@ export async function getSessionEndJob(args: { getSessionEndJobId(args.projectId, args.previousDeviceId), ); if (previousJob) { - const res = await handleJobStates(previousJob, args.previousDeviceId); - if (res) return res; - } - - // If no job found and not priority, retry - if (!priority) { - const backoffDelay = 50 * 2 ** retryCount; - await new Promise((resolve) => setTimeout(resolve, backoffDelay)); - return getSessionEndJob({ ...args, priority, retryCount: retryCount + 1 }); + return await handleJobStates(previousJob, args.previousDeviceId); } // Create session diff --git a/apps/worker/tsup.config.ts b/apps/worker/tsup.config.ts index 464d750a..9e1c93dd 100644 --- a/apps/worker/tsup.config.ts +++ b/apps/worker/tsup.config.ts @@ -6,7 +6,7 @@ const options: Options = { entry: ['src/index.ts'], noExternal: [/^@openpanel\/.*$/u, /^@\/.*$/u], external: ['@hyperdx/node-opentelemetry', 'winston'], - ignoreWatch: ['../../**/{.git,node_modules}/**'], + ignoreWatch: ['../../**/{.git,node_modules,dist}/**'], sourcemap: true, splitting: false, }; diff --git a/packages/db/src/buffers/base-buffer.ts b/packages/db/src/buffers/base-buffer.ts index 01974f5b..83aaca59 100644 --- a/packages/db/src/buffers/base-buffer.ts +++ b/packages/db/src/buffers/base-buffer.ts @@ -67,8 +67,6 @@ export class BaseBuffer { lockId, }); } - } else { - this.logger.warn('Failed to acquire lock. Skipping flush.', { lockId }); } } } diff --git a/packages/db/src/buffers/session-buffer.ts b/packages/db/src/buffers/session-buffer.ts index df791d6b..70ebd48b 100644 --- a/packages/db/src/buffers/session-buffer.ts +++ b/packages/db/src/buffers/session-buffer.ts @@ -58,14 +58,17 @@ export class SessionBuffer extends BaseBuffer { if (event.origin) { newSession.exit_origin = event.origin; } - newSession.duration = + const duration = new Date(newSession.ended_at).getTime() - new Date(newSession.created_at).getTime(); - if (newSession.duration < 0) { + if (duration > 0) { + newSession.duration = duration; + } else { this.logger.warn('Session duration is negative', { + duration, + event, session: newSession, }); - newSession.duration = 0; } newSession.properties = toDots({ ...(event.properties || {}), @@ -73,7 +76,7 @@ export class SessionBuffer extends BaseBuffer { }); // newSession.revenue += event.properties?.__revenue ?? 0; - if (event.name === 'screen_view') { + if (event.name === 'screen_view' && event.path) { newSession.screen_views.push(event.path); newSession.screen_view_count += 1; } else { @@ -161,8 +164,6 @@ export class SessionBuffer extends BaseBuffer { const sessions = await this.getSession(event); const [newSession] = sessions; - console.log(`Adding sessions ${sessions.length}`); - const multi = this.redis.multi(); multi.set( `session:${newSession.id}`, diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 753ce640..0c9eefdd 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -1,8 +1,8 @@ -import { path, assocPath, last, mergeDeepRight, pick, uniq } from 'ramda'; +import { path, assocPath, last, mergeDeepRight } from 'ramda'; import { escape } from 'sqlstring'; import { v4 as uuid } from 'uuid'; -import { toDots } from '@openpanel/common'; +import { DateTime, toDots } from '@openpanel/common'; import { cacheable, getCache } from '@openpanel/redis'; import type { IChartEventFilter } from '@openpanel/validation'; @@ -19,13 +19,8 @@ import type { EventMeta, Prisma } from '../prisma-client'; import { db } from '../prisma-client'; import { createSqlBuilder } from '../sql-builder'; import { getEventFiltersWhereClause } from './chart.service'; -import type { IClickhouseProfile, IServiceProfile } from './profile.service'; -import { - getProfileById, - getProfiles, - transformProfile, - upsertProfile, -} from './profile.service'; +import type { IServiceProfile } from './profile.service'; +import { getProfileById, getProfiles, upsertProfile } from './profile.service'; export type IImportedEvent = Omit< IClickhouseEvent, @@ -293,6 +288,42 @@ export async function createEvent(payload: IServiceCreateEventPayload) { payload.profileId = payload.deviceId; } + const event: IClickhouseEvent = { + id: uuid(), + name: payload.name, + device_id: payload.deviceId, + profile_id: payload.profileId ? String(payload.profileId) : '', + project_id: payload.projectId, + session_id: payload.sessionId, + properties: toDots(payload.properties), + path: payload.path ?? '', + origin: payload.origin ?? '', + created_at: DateTime.fromJSDate(payload.createdAt) + .setZone('UTC') + .toFormat('yyyy-MM-dd HH:mm:ss.SSS'), + country: payload.country ?? '', + city: payload.city ?? '', + region: payload.region ?? '', + longitude: payload.longitude ?? null, + latitude: payload.latitude ?? null, + os: payload.os ?? '', + os_version: payload.osVersion ?? '', + browser: payload.browser ?? '', + browser_version: payload.browserVersion ?? '', + device: payload.device ?? '', + brand: payload.brand ?? '', + model: payload.model ?? '', + duration: payload.duration, + referrer: payload.referrer ?? '', + referrer_name: payload.referrerName ?? '', + referrer_type: payload.referrerType ?? '', + imported_at: null, + sdk_name: payload.sdkName ?? '', + sdk_version: payload.sdkVersion ?? '', + }; + + await Promise.all([sessionBuffer.add(event), eventBuffer.add(event)]); + if (payload.profileId) { const profile = { id: String(payload.profileId), @@ -326,40 +357,6 @@ export async function createEvent(payload: IServiceCreateEventPayload) { } } - const event: IClickhouseEvent = { - id: uuid(), - name: payload.name, - device_id: payload.deviceId, - profile_id: payload.profileId ? String(payload.profileId) : '', - project_id: payload.projectId, - session_id: payload.sessionId, - properties: toDots(payload.properties), - path: payload.path ?? '', - origin: payload.origin ?? '', - created_at: formatClickhouseDate(payload.createdAt), - country: payload.country ?? '', - city: payload.city ?? '', - region: payload.region ?? '', - longitude: payload.longitude ?? null, - latitude: payload.latitude ?? null, - os: payload.os ?? '', - os_version: payload.osVersion ?? '', - browser: payload.browser ?? '', - browser_version: payload.browserVersion ?? '', - device: payload.device ?? '', - brand: payload.brand ?? '', - model: payload.model ?? '', - duration: payload.duration, - referrer: payload.referrer ?? '', - referrer_name: payload.referrerName ?? '', - referrer_type: payload.referrerType ?? '', - imported_at: null, - sdk_name: payload.sdkName ?? '', - sdk_version: payload.sdkVersion ?? '', - }; - - await Promise.all([sessionBuffer.add(event), eventBuffer.add(event)]); - return { document: event, }; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 668d2fc5..672f02d8 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -22,14 +22,18 @@ export interface EventsQueuePayloadIncomingEvent { headers: Record; currentDeviceId: string; previousDeviceId: string; - priority: boolean; }; } export interface EventsQueuePayloadCreateEvent { type: 'createEvent'; payload: Omit; } -type SessionEndRequired = 'sessionId' | 'deviceId' | 'profileId' | 'projectId'; +type SessionEndRequired = + | 'sessionId' + | 'deviceId' + | 'profileId' + | 'projectId' + | 'createdAt'; export interface EventsQueuePayloadCreateSessionEnd { type: 'createSessionEnd'; payload: Partial> & diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 67e94404..d02df427 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4,6 +4,12 @@ settings: autoInstallPeers: true excludeLinksFromLockfile: false +catalogs: + default: + zod: + specifier: ^3.24.2 + version: 3.24.2 + importers: .: @@ -1066,6 +1072,40 @@ importers: specifier: ^5.2.2 version: 5.6.3 + packages/fire: + dependencies: + '@faker-js/faker': + specifier: ^9.0.1 + version: 9.0.1 + '@openpanel/common': + specifier: workspace:* + version: link:../common + '@openpanel/db': + specifier: workspace:* + version: link:../db + csv-parse: + specifier: ^5.6.0 + version: 5.6.0 + date-fns: + specifier: ^3.3.1 + version: 3.3.1 + devDependencies: + '@openpanel/tsconfig': + specifier: workspace:* + version: link:../../tooling/typescript + '@openpanel/validation': + specifier: workspace:* + version: link:../validation + '@types/node': + specifier: 20.14.8 + version: 20.14.8 + tsup: + specifier: ^7.2.0 + version: 7.3.0(postcss@8.5.3)(typescript@5.6.3) + typescript: + specifier: ^5.2.2 + version: 5.6.3 + packages/integrations: dependencies: '@slack/bolt': @@ -3155,6 +3195,7 @@ packages: '@faker-js/faker@9.0.1': resolution: {integrity: sha512-4mDeYIgM3By7X6t5E6eYwLAa+2h4DeZDF7thhzIg6XB76jeEvMwadYAMCFJL/R4AnEBcAUO9+gL0vhy3s+qvZA==} engines: {node: '>=18.0.0', npm: '>=9.0.0'} + deprecated: Please update to a newer version '@fastify/accept-negotiator@2.0.1': resolution: {integrity: sha512-/c/TW2bO/v9JeEgoD/g1G5GxGeCF1Hafdf79WPmUlgYiBXummY0oX3VVq4yFkKKVBKDNlaDUYoab7g38RpPqCQ==} @@ -7485,6 +7526,9 @@ packages: csstype@3.1.3: resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==} + csv-parse@5.6.0: + resolution: {integrity: sha512-l3nz3euub2QMg5ouu5U09Ew9Wf6/wQ8I++ch1loQ0ljmzhmfZYrH9fflS22i/PQEvsPvxCwxgz5q7UB8K1JO4Q==} + d3-array@2.12.1: resolution: {integrity: sha512-B0ErZK/66mHtEsR1TkPEEkwdy+WDesimkM5gpZr5Dsg54BiTA5RXtYW5qTLIAcekaS9xfZrzBLF/OAkB3Qn1YQ==} @@ -19905,6 +19949,8 @@ snapshots: csstype@3.1.3: {} + csv-parse@5.6.0: {} + d3-array@2.12.1: dependencies: internmap: 1.0.1