add funnels
This commit is contained in:
@@ -4,10 +4,11 @@ import { getReferrerWithQuery, parseReferrer } from '@/utils/parseReferrer';
|
||||
import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent';
|
||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||
import { omit } from 'ramda';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { generateDeviceId, getTime, toISOString } from '@mixan/common';
|
||||
import type { IServiceCreateEventPayload } from '@mixan/db';
|
||||
import { createBotEvent, getEvents, getSalts } from '@mixan/db';
|
||||
import { createBotEvent, createEvent, getEvents, getSalts } from '@mixan/db';
|
||||
import type { JobsOptions } from '@mixan/queue';
|
||||
import { eventsQueue, findJobByPrefix } from '@mixan/queue';
|
||||
import type { PostEventPayload } from '@mixan/types';
|
||||
@@ -108,6 +109,7 @@ export async function postEvent(
|
||||
payload: {
|
||||
name: body.name,
|
||||
deviceId: event?.deviceId || '',
|
||||
sessionId: event?.sessionId || '',
|
||||
profileId,
|
||||
projectId,
|
||||
properties: body.properties ?? {},
|
||||
@@ -145,11 +147,16 @@ export async function postEvent(
|
||||
return reply.status(200).send('');
|
||||
}
|
||||
|
||||
const [geo, eventsJobs] = await Promise.all([
|
||||
const [geo, eventsJobs, events] = await Promise.all([
|
||||
parseIp(ip),
|
||||
eventsQueue.getJobs(['delayed']),
|
||||
getEvents(
|
||||
`SELECT * FROM events WHERE name = 'session_start' AND profile_id = '${profileId}' AND project_id = '${projectId}' ORDER BY created_at DESC LIMIT 1`
|
||||
),
|
||||
]);
|
||||
|
||||
const sessionStartEvent = events[0];
|
||||
|
||||
// find session_end job
|
||||
const sessionEndJobCurrentDeviceId = findJobByPrefix(
|
||||
eventsJobs,
|
||||
@@ -197,6 +204,7 @@ export async function postEvent(
|
||||
deviceId,
|
||||
profileId,
|
||||
projectId,
|
||||
sessionId: createSessionStart ? uuid() : sessionStartEvent?.sessionId ?? '',
|
||||
properties: Object.assign({}, omit(['path', 'referrer'], body.properties), {
|
||||
hash,
|
||||
query,
|
||||
@@ -246,14 +254,12 @@ export async function postEvent(
|
||||
}
|
||||
|
||||
if (createSessionStart) {
|
||||
eventsQueue.add('event', {
|
||||
type: 'createEvent',
|
||||
payload: {
|
||||
...payload,
|
||||
name: 'session_start',
|
||||
// @ts-expect-error
|
||||
createdAt: toISOString(getTime(payload.createdAt) - 10),
|
||||
},
|
||||
// We do not need to queue session_start
|
||||
await createEvent({
|
||||
...payload,
|
||||
name: 'session_start',
|
||||
// @ts-expect-error
|
||||
createdAt: toISOString(getTime(payload.createdAt) - 10),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user