diff --git a/apps/sdk-api/src/controllers/event.controller.ts b/apps/sdk-api/src/controllers/event.controller.ts index e4af6950..43fdf83a 100644 --- a/apps/sdk-api/src/controllers/event.controller.ts +++ b/apps/sdk-api/src/controllers/event.controller.ts @@ -1,7 +1,8 @@ -import { logger, logInfo } from '@/utils/logger'; +import { logger, logInfo, noop } from '@/utils/logger'; import { getClientIp, parseIp } from '@/utils/parseIp'; import { getReferrerWithQuery, parseReferrer } from '@/utils/parseReferrer'; import { isUserAgentSet, parseUserAgent } from '@/utils/parseUserAgent'; +import { isSameDomain, parsePath } from '@/utils/url'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { omit } from 'ramda'; import { v4 as uuid } from 'uuid'; @@ -10,58 +11,13 @@ import { generateDeviceId, getTime, toISOString } from '@mixan/common'; import type { IServiceCreateEventPayload } from '@mixan/db'; import { createEvent, getEvents, getSalts } from '@mixan/db'; import type { JobsOptions } from '@mixan/queue'; -import { eventsQueue, findJobByPrefix } from '@mixan/queue'; +import { eventsQueue } from '@mixan/queue'; +import { findJobByPrefix } from '@mixan/queue/src/utils'; import type { PostEventPayload } from '@mixan/sdk'; const SESSION_TIMEOUT = 1000 * 60 * 30; const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000; -function parseSearchParams( - params: URLSearchParams -): Record | undefined { - const result: Record = {}; - for (const [key, value] of params.entries()) { - result[key] = value; - } - return Object.keys(result).length ? result : undefined; -} - -function parsePath(path?: string): { - query?: Record; - path: string; - hash?: string; -} { - if (!path) { - return { - path: '', - }; - } - - try { - const url = new URL(path); - return { - query: parseSearchParams(url.searchParams), - path: url.pathname, - hash: url.hash || undefined, - }; - } catch (error) { - return { - path, - }; - } -} - -function isSameDomain(url1: string | undefined, url2: string | undefined) { - if (!url1 || !url2) { - return false; - } - try { - return new URL(url1).hostname === new URL(url2).hostname; - } catch (e) { - return false; - } -} - async function withTiming(name: string, promise: T) { try { const start = Date.now(); @@ -77,12 +33,31 @@ async function withTiming(name: string, promise: T) { } } +function createContextLogger(request: FastifyRequest) { + const _log = request.log.child({ + requestId: request.id, + requestUrl: request.url, + headers: request.headers, + projectId: request.projectId, + }); + let obj: Record = {}; + return { + add: (key: string, value: unknown) => (obj[key] = value), + addObject: (key: string, value: Record) => { + obj = { ...obj, ...value }; + }, + send: (message: string, value: Record) => + _log.info({ ...obj, ...value }, message), + }; +} + export async function postEvent( request: FastifyRequest<{ Body: PostEventPayload; }>, reply: FastifyReply ) { + const contextLogger = createContextLogger(request); let deviceId: string | null = null; const { projectId, body } = request; const properties = body.properties ?? {}; @@ -172,36 +147,34 @@ export async function postEvent( return reply.status(200).send(''); } - const [geo, eventsJobs] = await withTiming( - 'Get geo and job from queue', - Promise.all([parseIp(ip), eventsQueue.getJobs(['delayed'])]) - ); - - // find session_end job - const sessionEndJobCurrentDeviceId = findJobByPrefix( - eventsJobs, - `sessionEnd:${projectId}:${currentDeviceId}:` - ); - const sessionEndJobPreviousDeviceId = findJobByPrefix( - eventsJobs, - `sessionEnd:${projectId}:${previousDeviceId}:` - ); + const [geo, sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] = + await withTiming( + 'Get geo and jobs from queue', + Promise.all([ + parseIp(ip), + findJobByPrefix( + eventsQueue, + `sessionEnd:${projectId}:${currentDeviceId}:` + ), + findJobByPrefix( + eventsQueue, + `sessionEnd:${projectId}:${previousDeviceId}:` + ), + ]) + ); const createSessionStart = !sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId; if (sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId) { - request.log.info({}, 'found session current'); deviceId = currentDeviceId; const diff = Date.now() - sessionEndJobCurrentDeviceId.timestamp; sessionEndJobCurrentDeviceId.changeDelay(diff + SESSION_END_TIMEOUT); } else if (!sessionEndJobCurrentDeviceId && sessionEndJobPreviousDeviceId) { - request.log.info({}, 'found session previous'); deviceId = previousDeviceId; const diff = Date.now() - sessionEndJobPreviousDeviceId.timestamp; sessionEndJobPreviousDeviceId.changeDelay(diff + SESSION_END_TIMEOUT); } else { - request.log.info({}, 'new session with current'); deviceId = currentDeviceId; // Queue session end eventsQueue.add( @@ -219,28 +192,14 @@ export async function postEvent( ); } - const [sessionStartEvent] = await withTiming( + const [[sessionStartEvent], prevEventJob] = await withTiming( 'Get session start event', - getEvents( - `SELECT * FROM events WHERE name = 'session_start' AND device_id = '${deviceId}' AND project_id = '${projectId}' ORDER BY created_at DESC LIMIT 1` - ) - ); - - request.log.info( - { - ip, - origin, - ua, - uaInfo, - referrer, - profileId, - projectId, - deviceId, - geo, - sessionStartEvent, - path, - }, - 'incoming event' + Promise.all([ + getEvents( + `SELECT * FROM events WHERE name = 'session_start' AND device_id = '${deviceId}' AND project_id = '${projectId}' ORDER BY created_at DESC LIMIT 1` + ), + findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`), + ]) ); const payload: Omit = { @@ -274,11 +233,13 @@ export async function postEvent( meta: undefined, }; - const job = findJobByPrefix(eventsJobs, `event:${projectId}:${deviceId}:`); + const isDelayed = prevEventJob ? await prevEventJob?.isDelayed() : false; - if (job?.isDelayed && job.data.type === 'createEvent') { - const prevEvent = job.data.payload; + if (isDelayed && prevEventJob && prevEventJob.data.type === 'createEvent') { + const prevEvent = prevEventJob.data.payload; const duration = getTime(payload.createdAt) - getTime(prevEvent.createdAt); + contextLogger.add('prevEvent', prevEvent); + console.log('HERE?!?!?!'); // Set path from prev screen_view event if current event is not a screen_view if (payload.name != 'screen_view') { @@ -287,19 +248,15 @@ export async function postEvent( if (payload.name === 'screen_view') { if (duration < 0) { - request.log.info( - { - prevEvent, - payload, - }, - 'duration is wrong' - ); + contextLogger.send('duration is wrong', { + payload, + }); } else { // Skip update duration if it's wrong // Seems like request is not in right order await withTiming( 'Update previous job with duration', - job.updateData({ + prevEventJob.updateData({ type: 'createEvent', payload: { ...prevEvent, @@ -309,7 +266,7 @@ export async function postEvent( ); } - await withTiming('Promote previous job', job.promote()); + await withTiming('Promote previous job', prevEventJob.promote()); } } @@ -332,11 +289,24 @@ export async function postEvent( options.jobId = `event:${projectId}:${deviceId}:${Date.now()}`; } - request.log.info(payload, 'queue event'); + contextLogger.send('event is queued', { + ip, + origin, + ua, + uaInfo, + referrer, + profileId, + projectId, + deviceId, + geo, + sessionStartEvent, + path, + payload, + }); + // Queue current event - await withTiming( - 'Add current to event queue', - eventsQueue.add( + eventsQueue + .add( 'event', { type: 'createEvent', @@ -344,7 +314,7 @@ export async function postEvent( }, options ) - ); + .catch(noop('Failed to queue event')); reply.status(202).send(deviceId); } diff --git a/apps/sdk-api/src/utils/logger.ts b/apps/sdk-api/src/utils/logger.ts index 9e3ca2a4..1f03d4b2 100644 --- a/apps/sdk-api/src/utils/logger.ts +++ b/apps/sdk-api/src/utils/logger.ts @@ -1,15 +1,19 @@ import pino from 'pino'; +const ENABLED = process.env.NODE_ENV === 'production'; + const transport = pino.transport({ - targets: [ - { - target: '@logtail/pino', - options: { sourceToken: process.env.BETTERSTACK_TOKEN }, - }, - { - target: 'pino-pretty', - }, - ], + targets: ENABLED + ? [ + { + target: '@logtail/pino', + options: { sourceToken: process.env.BETTERSTACK_TOKEN }, + }, + { + target: 'pino-pretty', + }, + ] + : [], }); export const logger = pino(transport); @@ -17,3 +21,6 @@ export const logger = pino(transport); export function logInfo(msg: string, obj?: unknown) { logger.info(obj, msg); } + +export const noop = (message: string) => (error: unknown) => + logger.error(error, message); diff --git a/apps/sdk-api/src/utils/url.ts b/apps/sdk-api/src/utils/url.ts new file mode 100644 index 00000000..c6d3723d --- /dev/null +++ b/apps/sdk-api/src/utils/url.ts @@ -0,0 +1,48 @@ +export function parseSearchParams( + params: URLSearchParams +): Record | undefined { + const result: Record = {}; + for (const [key, value] of params.entries()) { + result[key] = value; + } + return Object.keys(result).length ? result : undefined; +} + +export function parsePath(path?: string): { + query?: Record; + path: string; + hash?: string; +} { + if (!path) { + return { + path: '', + }; + } + + try { + const url = new URL(path); + return { + query: parseSearchParams(url.searchParams), + path: url.pathname, + hash: url.hash || undefined, + }; + } catch (error) { + return { + path, + }; + } +} + +export function isSameDomain( + url1: string | undefined, + url2: string | undefined +) { + if (!url1 || !url2) { + return false; + } + try { + return new URL(url1).hostname === new URL(url2).hostname; + } catch (e) { + return false; + } +} diff --git a/packages/queue/src/utils.ts b/packages/queue/src/utils.ts index c92a7026..3c2881bf 100644 --- a/packages/queue/src/utils.ts +++ b/packages/queue/src/utils.ts @@ -1,8 +1,13 @@ -import type { Job } from 'bullmq'; +import type { Queue } from 'bullmq'; -export function findJobByPrefix( - jobs: Job[], - prefix: string +import { redis } from '../../redis'; + +export async function findJobByPrefix( + queue: Queue, + matcher: string ) { - return jobs.find((job) => job.opts.jobId?.startsWith(prefix)); + const prefix = `bull:${queue.name}:`; + const keys = await redis.keys(`${prefix}${matcher}*`); + const key = keys.findLast((key) => !key.endsWith(':logs')); + return key ? await queue.getJob(key.replace(prefix, '')) : undefined; }