try fix event.controller

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-03-07 22:18:50 +01:00
parent 6d5bfa4cbe
commit 4d17a29c2e
4 changed files with 147 additions and 117 deletions

View File

@@ -1,7 +1,8 @@
import { logger, logInfo } from '@/utils/logger'; import { logger, logInfo, noop } from '@/utils/logger';
import { getClientIp, parseIp } from '@/utils/parseIp'; import { getClientIp, parseIp } from '@/utils/parseIp';
import { getReferrerWithQuery, parseReferrer } from '@/utils/parseReferrer'; import { getReferrerWithQuery, parseReferrer } from '@/utils/parseReferrer';
import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent'; import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent';
import { isSameDomain, parsePath } from '@/utils/url';
import type { FastifyReply, FastifyRequest } from 'fastify'; import type { FastifyReply, FastifyRequest } from 'fastify';
import { omit } from 'ramda'; import { omit } from 'ramda';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
@@ -10,58 +11,13 @@ import { generateDeviceId, getTime, toISOString } from '@mixan/common';
import type { IServiceCreateEventPayload } from '@mixan/db'; import type { IServiceCreateEventPayload } from '@mixan/db';
import { createEvent, getEvents, getSalts } from '@mixan/db'; import { createEvent, getEvents, getSalts } from '@mixan/db';
import type { JobsOptions } from '@mixan/queue'; import type { JobsOptions } from '@mixan/queue';
import { eventsQueue, findJobByPrefix } from '@mixan/queue'; import { eventsQueue } from '@mixan/queue';
import { findJobByPrefix } from '@mixan/queue/src/utils';
import type { PostEventPayload } from '@mixan/sdk'; import type { PostEventPayload } from '@mixan/sdk';
const SESSION_TIMEOUT = 1000 * 60 * 30; const SESSION_TIMEOUT = 1000 * 60 * 30;
const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000; const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000;
function parseSearchParams(
params: URLSearchParams
): Record<string, string> | undefined {
const result: Record<string, string> = {};
for (const [key, value] of params.entries()) {
result[key] = value;
}
return Object.keys(result).length ? result : undefined;
}
function parsePath(path?: string): {
query?: Record<string, string>;
path: string;
hash?: string;
} {
if (!path) {
return {
path: '',
};
}
try {
const url = new URL(path);
return {
query: parseSearchParams(url.searchParams),
path: url.pathname,
hash: url.hash || undefined,
};
} catch (error) {
return {
path,
};
}
}
function isSameDomain(url1: string | undefined, url2: string | undefined) {
if (!url1 || !url2) {
return false;
}
try {
return new URL(url1).hostname === new URL(url2).hostname;
} catch (e) {
return false;
}
}
async function withTiming<T>(name: string, promise: T) { async function withTiming<T>(name: string, promise: T) {
try { try {
const start = Date.now(); const start = Date.now();
@@ -77,12 +33,31 @@ async function withTiming<T>(name: string, promise: T) {
} }
} }
function createContextLogger(request: FastifyRequest) {
const _log = request.log.child({
requestId: request.id,
requestUrl: request.url,
headers: request.headers,
projectId: request.projectId,
});
let obj: Record<string, unknown> = {};
return {
add: (key: string, value: unknown) => (obj[key] = value),
addObject: (key: string, value: Record<string, unknown>) => {
obj = { ...obj, ...value };
},
send: (message: string, value: Record<string, unknown>) =>
_log.info({ ...obj, ...value }, message),
};
}
export async function postEvent( export async function postEvent(
request: FastifyRequest<{ request: FastifyRequest<{
Body: PostEventPayload; Body: PostEventPayload;
}>, }>,
reply: FastifyReply reply: FastifyReply
) { ) {
const contextLogger = createContextLogger(request);
let deviceId: string | null = null; let deviceId: string | null = null;
const { projectId, body } = request; const { projectId, body } = request;
const properties = body.properties ?? {}; const properties = body.properties ?? {};
@@ -172,36 +147,34 @@ export async function postEvent(
return reply.status(200).send(''); return reply.status(200).send('');
} }
const [geo, eventsJobs] = await withTiming( const [geo, sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] =
'Get geo and job from queue', await withTiming(
Promise.all([parseIp(ip), eventsQueue.getJobs(['delayed'])]) 'Get geo and jobs from queue',
); Promise.all([
parseIp(ip),
// find session_end job findJobByPrefix(
const sessionEndJobCurrentDeviceId = findJobByPrefix( eventsQueue,
eventsJobs,
`sessionEnd:${projectId}:${currentDeviceId}:` `sessionEnd:${projectId}:${currentDeviceId}:`
); ),
const sessionEndJobPreviousDeviceId = findJobByPrefix( findJobByPrefix(
eventsJobs, eventsQueue,
`sessionEnd:${projectId}:${previousDeviceId}:` `sessionEnd:${projectId}:${previousDeviceId}:`
),
])
); );
const createSessionStart = const createSessionStart =
!sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId; !sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId;
if (sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId) { if (sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId) {
request.log.info({}, 'found session current');
deviceId = currentDeviceId; deviceId = currentDeviceId;
const diff = Date.now() - sessionEndJobCurrentDeviceId.timestamp; const diff = Date.now() - sessionEndJobCurrentDeviceId.timestamp;
sessionEndJobCurrentDeviceId.changeDelay(diff + SESSION_END_TIMEOUT); sessionEndJobCurrentDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
} else if (!sessionEndJobCurrentDeviceId && sessionEndJobPreviousDeviceId) { } else if (!sessionEndJobCurrentDeviceId && sessionEndJobPreviousDeviceId) {
request.log.info({}, 'found session previous');
deviceId = previousDeviceId; deviceId = previousDeviceId;
const diff = Date.now() - sessionEndJobPreviousDeviceId.timestamp; const diff = Date.now() - sessionEndJobPreviousDeviceId.timestamp;
sessionEndJobPreviousDeviceId.changeDelay(diff + SESSION_END_TIMEOUT); sessionEndJobPreviousDeviceId.changeDelay(diff + SESSION_END_TIMEOUT);
} else { } else {
request.log.info({}, 'new session with current');
deviceId = currentDeviceId; deviceId = currentDeviceId;
// Queue session end // Queue session end
eventsQueue.add( eventsQueue.add(
@@ -219,28 +192,14 @@ export async function postEvent(
); );
} }
const [sessionStartEvent] = await withTiming( const [[sessionStartEvent], prevEventJob] = await withTiming(
'Get session start event', 'Get session start event',
Promise.all([
getEvents( getEvents(
`SELECT * FROM events WHERE name = 'session_start' AND device_id = '${deviceId}' AND project_id = '${projectId}' ORDER BY created_at DESC LIMIT 1` `SELECT * FROM events WHERE name = 'session_start' AND device_id = '${deviceId}' AND project_id = '${projectId}' ORDER BY created_at DESC LIMIT 1`
) ),
); findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`),
])
request.log.info(
{
ip,
origin,
ua,
uaInfo,
referrer,
profileId,
projectId,
deviceId,
geo,
sessionStartEvent,
path,
},
'incoming event'
); );
const payload: Omit<IServiceCreateEventPayload, 'id'> = { const payload: Omit<IServiceCreateEventPayload, 'id'> = {
@@ -274,11 +233,13 @@ export async function postEvent(
meta: undefined, meta: undefined,
}; };
const job = findJobByPrefix(eventsJobs, `event:${projectId}:${deviceId}:`); const isDelayed = prevEventJob ? await prevEventJob?.isDelayed() : false;
if (job?.isDelayed && job.data.type === 'createEvent') { if (isDelayed && prevEventJob && prevEventJob.data.type === 'createEvent') {
const prevEvent = job.data.payload; const prevEvent = prevEventJob.data.payload;
const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt); const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt);
contextLogger.add('prevEvent', prevEvent);
console.log('HERE?!?!?!');
// Set path from prev screen_view event if current event is not a screen_view // Set path from prev screen_view event if current event is not a screen_view
if (payload.name != 'screen_view') { if (payload.name != 'screen_view') {
@@ -287,19 +248,15 @@ export async function postEvent(
if (payload.name === 'screen_view') { if (payload.name === 'screen_view') {
if (duration < 0) { if (duration < 0) {
request.log.info( contextLogger.send('duration is wrong', {
{
prevEvent,
payload, payload,
}, });
'duration is wrong'
);
} else { } else {
// Skip update duration if it's wrong // Skip update duration if it's wrong
// Seems like request is not in right order // Seems like request is not in right order
await withTiming( await withTiming(
'Update previous job with duration', 'Update previous job with duration',
job.updateData({ prevEventJob.updateData({
type: 'createEvent', type: 'createEvent',
payload: { payload: {
...prevEvent, ...prevEvent,
@@ -309,7 +266,7 @@ export async function postEvent(
); );
} }
await withTiming('Promote previous job', job.promote()); await withTiming('Promote previous job', prevEventJob.promote());
} }
} }
@@ -332,11 +289,24 @@ export async function postEvent(
options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`; options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`;
} }
request.log.info(payload, 'queue event'); contextLogger.send('event is queued', {
ip,
origin,
ua,
uaInfo,
referrer,
profileId,
projectId,
deviceId,
geo,
sessionStartEvent,
path,
payload,
});
// Queue current event // Queue current event
await withTiming( eventsQueue
'Add current to event queue', .add(
eventsQueue.add(
'event', 'event',
{ {
type: 'createEvent', type: 'createEvent',
@@ -344,7 +314,7 @@ export async function postEvent(
}, },
options options
) )
); .catch(noop('Failed to queue event'));
reply.status(202).send(deviceId); reply.status(202).send(deviceId);
} }

View File

@@ -1,7 +1,10 @@
import pino from 'pino'; import pino from 'pino';
const ENABLED = process.env.NODE_ENV === 'production';
const transport = pino.transport({ const transport = pino.transport({
targets: [ targets: ENABLED
? [
{ {
target: '@logtail/pino', target: '@logtail/pino',
options: { sourceToken: process.env.BETTERSTACK_TOKEN }, options: { sourceToken: process.env.BETTERSTACK_TOKEN },
@@ -9,7 +12,8 @@ const transport = pino.transport({
{ {
target: 'pino-pretty', target: 'pino-pretty',
}, },
], ]
: [],
}); });
export const logger = pino(transport); export const logger = pino(transport);
@@ -17,3 +21,6 @@ export const logger = pino(transport);
export function logInfo(msg: string, obj?: unknown) { export function logInfo(msg: string, obj?: unknown) {
logger.info(obj, msg); logger.info(obj, msg);
} }
export const noop = (message: string) => (error: unknown) =>
logger.error(error, message);

View File

@@ -0,0 +1,48 @@
export function parseSearchParams(
params: URLSearchParams
): Record<string, string> | undefined {
const result: Record<string, string> = {};
for (const [key, value] of params.entries()) {
result[key] = value;
}
return Object.keys(result).length ? result : undefined;
}
export function parsePath(path?: string): {
query?: Record<string, string>;
path: string;
hash?: string;
} {
if (!path) {
return {
path: '',
};
}
try {
const url = new URL(path);
return {
query: parseSearchParams(url.searchParams),
path: url.pathname,
hash: url.hash || undefined,
};
} catch (error) {
return {
path,
};
}
}
export function isSameDomain(
url1: string | undefined,
url2: string | undefined
) {
if (!url1 || !url2) {
return false;
}
try {
return new URL(url1).hostname === new URL(url2).hostname;
} catch (e) {
return false;
}
}

View File

@@ -1,8 +1,13 @@
import type { Job } from 'bullmq'; import type { Queue } from 'bullmq';
export function findJobByPrefix<T>( import { redis } from '../../redis';
jobs: Job<T, any, string>[],
prefix: string export async function findJobByPrefix<T>(
queue: Queue<T, any, string>,
matcher: string
) { ) {
return jobs.find((job) => job.opts.jobId?.startsWith(prefix)); const prefix = `bull:${queue.name}:`;
const keys = await redis.keys(`${prefix}${matcher}*`);
const key = keys.findLast((key) => !key.endsWith(':logs'));
return key ? await queue.getJob(key.replace(prefix, '')) : undefined;
} }