fix(buffer): merge events in queue and get last screen view from buffer instead of cache
This commit is contained in:
@@ -79,7 +79,7 @@ export async function createSessionEnd(
|
|||||||
const [lastScreenView, eventsInDb] = await Promise.all([
|
const [lastScreenView, eventsInDb] = await Promise.all([
|
||||||
eventBuffer.getLastScreenView({
|
eventBuffer.getLastScreenView({
|
||||||
projectId: payload.projectId,
|
projectId: payload.projectId,
|
||||||
profileId: payload.profileId || payload.deviceId,
|
sessionId: payload.sessionId,
|
||||||
}),
|
}),
|
||||||
getCompleteSessionWithSessionStart({
|
getCompleteSessionWithSessionStart({
|
||||||
projectId: payload.projectId,
|
projectId: payload.projectId,
|
||||||
|
|||||||
@@ -7,8 +7,11 @@ import { createSessionEnd, getSessionEnd } from '@/utils/session-handler';
|
|||||||
import { isSameDomain, parsePath } from '@openpanel/common';
|
import { isSameDomain, parsePath } from '@openpanel/common';
|
||||||
import { parseUserAgent } from '@openpanel/common/server';
|
import { parseUserAgent } from '@openpanel/common/server';
|
||||||
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
|
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
|
||||||
import { checkNotificationRulesForEvent, createEvent } from '@openpanel/db';
|
import {
|
||||||
import { getLastScreenViewFromProfileId } from '@openpanel/db';
|
checkNotificationRulesForEvent,
|
||||||
|
createEvent,
|
||||||
|
eventBuffer,
|
||||||
|
} from '@openpanel/db';
|
||||||
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
|
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
|
||||||
import * as R from 'ramda';
|
import * as R from 'ramda';
|
||||||
|
|
||||||
@@ -106,7 +109,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
// if timestamp is from the past we dont want to create a new session
|
// if timestamp is from the past we dont want to create a new session
|
||||||
if (uaInfo.isServer || isTimestampFromThePast) {
|
if (uaInfo.isServer || isTimestampFromThePast) {
|
||||||
const event = profileId
|
const event = profileId
|
||||||
? await getLastScreenViewFromProfileId({
|
? await eventBuffer.getLastScreenView({
|
||||||
profileId,
|
profileId,
|
||||||
projectId,
|
projectId,
|
||||||
})
|
})
|
||||||
@@ -124,13 +127,21 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
profileId,
|
profileId,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const lastScreenView = await eventBuffer.getLastScreenView({
|
||||||
|
projectId,
|
||||||
|
sessionId: sessionEnd.payload.sessionId,
|
||||||
|
});
|
||||||
|
|
||||||
const payload: IServiceCreateEventPayload = merge(baseEvent, {
|
const payload: IServiceCreateEventPayload = merge(baseEvent, {
|
||||||
deviceId: sessionEnd.payload.deviceId,
|
deviceId: sessionEnd.payload.deviceId,
|
||||||
sessionId: sessionEnd.payload.sessionId,
|
sessionId: sessionEnd.payload.sessionId,
|
||||||
referrer: sessionEnd.payload?.referrer,
|
referrer: sessionEnd.payload?.referrer,
|
||||||
referrerName: sessionEnd.payload?.referrerName,
|
referrerName: sessionEnd.payload?.referrerName,
|
||||||
referrerType: sessionEnd.payload?.referrerType,
|
referrerType: sessionEnd.payload?.referrerType,
|
||||||
}) as IServiceCreateEventPayload;
|
// if the path is not set, use the last screen view path
|
||||||
|
path: baseEvent.path || lastScreenView?.path || '',
|
||||||
|
origin: baseEvent.origin || lastScreenView?.origin || '',
|
||||||
|
} as Partial<IServiceCreateEventPayload>) as IServiceCreateEventPayload;
|
||||||
|
|
||||||
if (sessionEnd.notFound) {
|
if (sessionEnd.notFound) {
|
||||||
await createSessionEnd({ payload });
|
await createSessionEnd({ payload });
|
||||||
|
|||||||
@@ -386,33 +386,10 @@ return "OK"
|
|||||||
timer.processSessionEvents = performance.now() - now;
|
timer.processSessionEvents = performance.now() - now;
|
||||||
now = performance.now();
|
now = performance.now();
|
||||||
|
|
||||||
// (C) Sort events by creation time.
|
|
||||||
eventsToClickhouse.sort(
|
|
||||||
(a, b) =>
|
|
||||||
new Date(a.created_at || 0).getTime() -
|
|
||||||
new Date(b.created_at || 0).getTime(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// (B) Process no-session events
|
// (B) Process no-session events
|
||||||
for (const eventStr of regularQueueEvents) {
|
for (const eventStr of regularQueueEvents) {
|
||||||
const event = getSafeJson<IClickhouseEvent>(eventStr);
|
const event = getSafeJson<IClickhouseEvent>(eventStr);
|
||||||
if (event) {
|
if (event) {
|
||||||
const sessionEvents = sessions[event.session_id] || [];
|
|
||||||
const screenView = sessionEvents.findLast((sessionEvent) => {
|
|
||||||
const isScreenView = sessionEvent.name === 'screen_view';
|
|
||||||
const isBeforeEvent =
|
|
||||||
new Date(sessionEvent.created_at).getTime() <
|
|
||||||
new Date(event.created_at).getTime();
|
|
||||||
|
|
||||||
return isScreenView && isBeforeEvent;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (screenView) {
|
|
||||||
event.path = screenView.path;
|
|
||||||
event.origin = screenView.origin;
|
|
||||||
event.properties.__inherit_from = screenView.id;
|
|
||||||
}
|
|
||||||
|
|
||||||
eventsToClickhouse.push(event);
|
eventsToClickhouse.push(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -584,25 +561,45 @@ return "OK"
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the latest screen_view event for a given project/profile.
|
* Retrieve the latest screen_view event for a given project/profile or project/session
|
||||||
*/
|
*/
|
||||||
public async getLastScreenView({
|
public async getLastScreenView({
|
||||||
projectId,
|
projectId,
|
||||||
profileId,
|
...rest
|
||||||
}: {
|
}:
|
||||||
projectId: string;
|
| {
|
||||||
profileId: string;
|
projectId: string;
|
||||||
}): Promise<IServiceEvent | null> {
|
profileId: string;
|
||||||
const redis = getRedisCache();
|
}
|
||||||
const eventStr = await redis.get(
|
| {
|
||||||
this.getLastEventKey({ projectId, profileId }),
|
projectId: string;
|
||||||
);
|
sessionId: string;
|
||||||
if (eventStr) {
|
}): Promise<IServiceEvent | null> {
|
||||||
const parsed = getSafeJson<IClickhouseEvent>(eventStr);
|
if ('profileId' in rest) {
|
||||||
if (parsed) {
|
const redis = getRedisCache();
|
||||||
return transformEvent(parsed);
|
const eventStr = await redis.get(
|
||||||
|
this.getLastEventKey({ projectId, profileId: rest.profileId }),
|
||||||
|
);
|
||||||
|
if (eventStr) {
|
||||||
|
const parsed = getSafeJson<IClickhouseEvent>(eventStr);
|
||||||
|
if (parsed) {
|
||||||
|
return transformEvent(parsed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ('sessionId' in rest) {
|
||||||
|
const redis = getRedisCache();
|
||||||
|
const sessionKey = this.getSessionKey(rest.sessionId);
|
||||||
|
const lastEvent = await redis.lindex(sessionKey, -1);
|
||||||
|
if (lastEvent) {
|
||||||
|
const parsed = getSafeJson<IClickhouseEvent>(lastEvent);
|
||||||
|
if (parsed) {
|
||||||
|
return transformEvent(parsed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -579,35 +579,6 @@ export function getConversionEventNames(projectId: string) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getLastScreenViewFromProfileId({
|
|
||||||
profileId,
|
|
||||||
projectId,
|
|
||||||
}: {
|
|
||||||
profileId: string;
|
|
||||||
projectId: string;
|
|
||||||
}) {
|
|
||||||
if (!profileId) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
const eventInBuffer = await eventBuffer.getLastScreenView({
|
|
||||||
projectId,
|
|
||||||
profileId,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (eventInBuffer) {
|
|
||||||
return eventInBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
const [eventInDb] = profileId
|
|
||||||
? await getEvents(
|
|
||||||
`SELECT * FROM ${TABLE_NAMES.events} WHERE name = 'screen_view' AND profile_id = ${escape(profileId)} AND project_id = ${escape(projectId)} AND created_at >= now() - INTERVAL 30 MINUTE ORDER BY created_at DESC LIMIT 1`,
|
|
||||||
)
|
|
||||||
: [];
|
|
||||||
|
|
||||||
return eventInDb || null;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getTopPages({
|
export async function getTopPages({
|
||||||
projectId,
|
projectId,
|
||||||
cursor,
|
cursor,
|
||||||
|
|||||||
Reference in New Issue
Block a user