diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index bc544c93..41304502 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -79,7 +79,7 @@ export async function createSessionEnd( const [lastScreenView, eventsInDb] = await Promise.all([ eventBuffer.getLastScreenView({ projectId: payload.projectId, - profileId: payload.profileId || payload.deviceId, + sessionId: payload.sessionId, }), getCompleteSessionWithSessionStart({ projectId: payload.projectId, diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 270aa166..3c3f86d9 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -7,8 +7,11 @@ import { createSessionEnd, getSessionEnd } from '@/utils/session-handler'; import { isSameDomain, parsePath } from '@openpanel/common'; import { parseUserAgent } from '@openpanel/common/server'; import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; -import { checkNotificationRulesForEvent, createEvent } from '@openpanel/db'; -import { getLastScreenViewFromProfileId } from '@openpanel/db'; +import { + checkNotificationRulesForEvent, + createEvent, + eventBuffer, +} from '@openpanel/db'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import * as R from 'ramda'; @@ -106,7 +109,7 @@ export async function incomingEvent(job: Job) { // if timestamp is from the past we dont want to create a new session if (uaInfo.isServer || isTimestampFromThePast) { const event = profileId - ? await getLastScreenViewFromProfileId({ + ? await eventBuffer.getLastScreenView({ profileId, projectId, }) @@ -124,13 +127,21 @@ export async function incomingEvent(job: Job) { profileId, }); + const lastScreenView = await eventBuffer.getLastScreenView({ + projectId, + sessionId: sessionEnd.payload.sessionId, + }); + const payload: IServiceCreateEventPayload = merge(baseEvent, { deviceId: sessionEnd.payload.deviceId, sessionId: sessionEnd.payload.sessionId, referrer: sessionEnd.payload?.referrer, referrerName: sessionEnd.payload?.referrerName, referrerType: sessionEnd.payload?.referrerType, - }) as IServiceCreateEventPayload; + // if the path is not set, use the last screen view path + path: baseEvent.path || lastScreenView?.path || '', + origin: baseEvent.origin || lastScreenView?.origin || '', + } as Partial) as IServiceCreateEventPayload; if (sessionEnd.notFound) { await createSessionEnd({ payload }); diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer-redis.ts index 4496cdad..dc3a0abf 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer-redis.ts @@ -386,33 +386,10 @@ return "OK" timer.processSessionEvents = performance.now() - now; now = performance.now(); - // (C) Sort events by creation time. - eventsToClickhouse.sort( - (a, b) => - new Date(a.created_at || 0).getTime() - - new Date(b.created_at || 0).getTime(), - ); - // (B) Process no-session events for (const eventStr of regularQueueEvents) { const event = getSafeJson(eventStr); if (event) { - const sessionEvents = sessions[event.session_id] || []; - const screenView = sessionEvents.findLast((sessionEvent) => { - const isScreenView = sessionEvent.name === 'screen_view'; - const isBeforeEvent = - new Date(sessionEvent.created_at).getTime() < - new Date(event.created_at).getTime(); - - return isScreenView && isBeforeEvent; - }); - - if (screenView) { - event.path = screenView.path; - event.origin = screenView.origin; - event.properties.__inherit_from = screenView.id; - } - eventsToClickhouse.push(event); } } @@ -584,25 +561,45 @@ return "OK" } /** - * Retrieve the latest screen_view event for a given project/profile. + * Retrieve the latest screen_view event for a given project/profile or project/session */ public async getLastScreenView({ projectId, - profileId, - }: { - projectId: string; - profileId: string; - }): Promise { - const redis = getRedisCache(); - const eventStr = await redis.get( - this.getLastEventKey({ projectId, profileId }), - ); - if (eventStr) { - const parsed = getSafeJson(eventStr); - if (parsed) { - return transformEvent(parsed); + ...rest + }: + | { + projectId: string; + profileId: string; + } + | { + projectId: string; + sessionId: string; + }): Promise { + if ('profileId' in rest) { + const redis = getRedisCache(); + const eventStr = await redis.get( + this.getLastEventKey({ projectId, profileId: rest.profileId }), + ); + if (eventStr) { + const parsed = getSafeJson(eventStr); + if (parsed) { + return transformEvent(parsed); + } } } + + if ('sessionId' in rest) { + const redis = getRedisCache(); + const sessionKey = this.getSessionKey(rest.sessionId); + const lastEvent = await redis.lindex(sessionKey, -1); + if (lastEvent) { + const parsed = getSafeJson(lastEvent); + if (parsed) { + return transformEvent(parsed); + } + } + } + return null; } diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index abacd5b2..6dab3d0f 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -579,35 +579,6 @@ export function getConversionEventNames(projectId: string) { }); } -export async function getLastScreenViewFromProfileId({ - profileId, - projectId, -}: { - profileId: string; - projectId: string; -}) { - if (!profileId) { - return null; - } - - const eventInBuffer = await eventBuffer.getLastScreenView({ - projectId, - profileId, - }); - - if (eventInBuffer) { - return eventInBuffer; - } - - const [eventInDb] = profileId - ? await getEvents( - `SELECT * FROM ${TABLE_NAMES.events} WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} AND created_at >= now() - INTERVAL 30 MINUTE ORDER BY created_at DESC LIMIT 1`, - ) - : []; - - return eventInDb || null; -} - export async function getTopPages({ projectId, cursor,