fix(buffer): debug stalled events
This commit is contained in:
@@ -128,6 +128,14 @@ export async function createSessionEnd(
|
|||||||
|
|
||||||
await checkNotificationRulesForSessionEnd(events);
|
await checkNotificationRulesForSessionEnd(events);
|
||||||
|
|
||||||
|
logger.info('Creating session_end', {
|
||||||
|
sessionStart,
|
||||||
|
lastEvent,
|
||||||
|
screenViews,
|
||||||
|
sessionDuration,
|
||||||
|
events,
|
||||||
|
});
|
||||||
|
|
||||||
return createEvent({
|
return createEvent({
|
||||||
...sessionStart,
|
...sessionStart,
|
||||||
properties: {
|
properties: {
|
||||||
|
|||||||
@@ -20,10 +20,16 @@ const GLOBAL_PROPERTIES = ['__path', '__referrer'];
|
|||||||
const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
|
const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
|
||||||
R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B;
|
R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B;
|
||||||
|
|
||||||
async function createEventAndNotify(payload: IServiceCreateEventPayload) {
|
async function createEventAndNotify(
|
||||||
|
payload: IServiceCreateEventPayload,
|
||||||
|
jobData: Job<EventsQueuePayloadIncomingEvent>['data']['payload'],
|
||||||
|
) {
|
||||||
await checkNotificationRulesForEvent(payload).catch((e) => {
|
await checkNotificationRulesForEvent(payload).catch((e) => {
|
||||||
logger.error('Error checking notification rules', { error: e });
|
logger.error('Error checking notification rules', { error: e });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
logger.info('Creating event', { event: payload, jobData });
|
||||||
|
|
||||||
return createEvent(payload);
|
return createEvent(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -107,7 +113,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
: null;
|
: null;
|
||||||
|
|
||||||
const payload = merge(omit(['properties'], event ?? {}), baseEvent);
|
const payload = merge(omit(['properties'], event ?? {}), baseEvent);
|
||||||
return createEventAndNotify(payload as IServiceEvent);
|
return createEventAndNotify(payload as IServiceEvent, job.data.payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
const sessionEnd = await getSessionEnd({
|
const sessionEnd = await getSessionEnd({
|
||||||
@@ -130,5 +136,5 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
await createSessionEnd({ payload });
|
await createSessionEnd({ payload });
|
||||||
}
|
}
|
||||||
|
|
||||||
return createEventAndNotify(payload);
|
return createEventAndNotify(payload, job.data.payload);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ return "OK"
|
|||||||
}),
|
}),
|
||||||
eventJson,
|
eventJson,
|
||||||
'EX',
|
'EX',
|
||||||
60 * 31,
|
60 * 60,
|
||||||
);
|
);
|
||||||
|
|
||||||
addEventToSession();
|
addEventToSession();
|
||||||
@@ -511,11 +511,13 @@ return "OK"
|
|||||||
|
|
||||||
const flush: IClickhouseEvent[] = [];
|
const flush: IClickhouseEvent[] = [];
|
||||||
const pending: IClickhouseEvent[] = [];
|
const pending: IClickhouseEvent[] = [];
|
||||||
|
let hasSessionEnd = false;
|
||||||
|
|
||||||
for (let i = 0; i < events.length; i++) {
|
for (let i = 0; i < events.length; i++) {
|
||||||
const event = events[i]!;
|
const event = events[i]!;
|
||||||
|
|
||||||
if (event.name === 'session_end') {
|
if (event.name === 'session_end') {
|
||||||
|
hasSessionEnd = true;
|
||||||
flush.push(event);
|
flush.push(event);
|
||||||
} else {
|
} else {
|
||||||
// For screen_view events, look for next event
|
// For screen_view events, look for next event
|
||||||
@@ -527,6 +529,8 @@ return "OK"
|
|||||||
new Date(event.created_at).getTime();
|
new Date(event.created_at).getTime();
|
||||||
}
|
}
|
||||||
flush.push(event);
|
flush.push(event);
|
||||||
|
} else if (hasSessionEnd) {
|
||||||
|
flush.push(event);
|
||||||
} else {
|
} else {
|
||||||
pending.push(event);
|
pending.push(event);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user