diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 74c2da6c..c4e2c85c 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -92,28 +92,6 @@ const startServer = async () => { const ignoreLog = ['/healthcheck', '/metrics', '/misc']; const ignoreMethods = ['OPTIONS']; - fastify.addHook('onRequest', (request, reply, done) => { - if (ignoreMethods.includes(request.method)) { - return done(); - } - if (ignoreLog.some((path) => request.url.startsWith(path))) { - return done(); - } - if (request.url.includes('trpc')) { - request.log.info('request incoming', { - url: request.url.split('?')[0], - method: request.method, - input: getTrpcInput(request), - }); - } else { - request.log.info('request incoming', { - url: request.url, - method: request.method, - }); - } - done(); - }); - fastify.addHook('onResponse', (request, reply, done) => { if (ignoreMethods.includes(request.method)) { return done(); @@ -133,6 +111,8 @@ const startServer = async () => { url: request.url, method: request.method, responseTime: reply.elapsedTime, + headers: request.headers, + body: request.body, }); } done(); diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index ef3fec5c..bde4bf1d 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -84,12 +84,27 @@ async function start() { }); worker.on('failed', (job) => { - logger.error('job failed', { - worker: worker.name, - data: job?.data, - error: job?.failedReason, - options: job?.opts, - }); + if (job) { + logger.error('job failed', { + worker: worker.name, + data: job.data, + error: job.failedReason, + options: job.opts, + }); + } + }); + + worker.on('completed', (job) => { + if (job) { + logger.info('job completed', { + worker: worker.name, + data: job.data, + duration: + job.processedOn && job.finishedOn + ? job.finishedOn - job.processedOn + : undefined, + }); + } }); worker.on('ioredis:close', () => { diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index a6d8d9b7..2b5e697e 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -1,14 +1,16 @@ import type { Job } from 'bullmq'; import { last } from 'ramda'; +import { logger as baseLogger } from '@/utils/logger'; import { getTime } from '@openpanel/common'; import { + type IServiceEvent, TABLE_NAMES, createEvent, eventBuffer, getEvents, } from '@openpanel/db'; -import { createLogger } from '@openpanel/logger'; +import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; async function getCompleteSession({ @@ -32,66 +34,63 @@ async function getCompleteSession({ return getEvents(sql); } +async function getCompleteSessionWithSessionStart({ + projectId, + sessionId, + logger, +}: { + projectId: string; + sessionId: string; + logger: ILogger; +}): Promise> { + const intervals = [6, 12, 24, 72]; + + for (const hoursInterval of intervals) { + const events = await getCompleteSession({ + projectId, + sessionId, + hoursInterval, + }); + + if (events.find((event) => event.name === 'session_start')) { + return events; + } + + logger.warn(`Checking last ${hoursInterval} hours for session_start`); + } + + return []; +} + export async function createSessionEnd( job: Job, ) { - const logger = createLogger({ - name: 'job:create-session-end', - }).child({ + const logger = baseLogger.child({ payload: job.data.payload, jobId: job.id, }); const payload = job.data.payload; - const lastScreenView = await eventBuffer.getLastScreenView({ - projectId: payload.projectId, - profileId: payload.profileId || payload.deviceId, - }); - const eventsInBuffer = lastScreenView - ? [lastScreenView] - : await eventBuffer.findMany( - (item) => item.session_id === payload.sessionId, - ); - - if (lastScreenView) { - logger.info('found last screen view in buffer'); - } else if (eventsInBuffer.length > 0) { - logger.info('found events in buffer'); - } else { - logger.info('no events in buffer'); - } - - let eventsInDb = await getCompleteSession({ - projectId: payload.projectId, - sessionId: payload.sessionId, - hoursInterval: 12, - }); - - // If session_start does not exist, try to find it the last 24 hours - if (!eventsInDb.find((event) => event.name === 'session_start')) { - logger.warn('Checking last 24 hours for session_start'); - eventsInDb = await getCompleteSession({ + const [lastScreenView, eventsInDb] = await Promise.all([ + eventBuffer.getLastScreenView({ + projectId: payload.projectId, + profileId: payload.profileId || payload.deviceId, + }), + getCompleteSessionWithSessionStart({ projectId: payload.projectId, sessionId: payload.sessionId, - hoursInterval: 24, - }); - } - - // If session_start does not exist, try to find it the last 72 hours - if (!eventsInDb.find((event) => event.name === 'session_start')) { - logger.warn('Checking last 72 hours for session_start'); - eventsInDb = await getCompleteSession({ - projectId: payload.projectId, - sessionId: payload.sessionId, - hoursInterval: 72, - }); - } + logger, + }), + ]); // sort last inserted first - const events = [...eventsInBuffer, ...eventsInDb].sort( - (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), - ); + const events = [lastScreenView, ...eventsInDb] + .filter((event): event is IServiceEvent => !!event) + .sort( + (a, b) => + new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), + ); events.map((event, index) => { job.log( diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 7464e816..45e285ea 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -4,11 +4,11 @@ import type { Job } from 'bullmq'; import { omit } from 'ramda'; import { v4 as uuid } from 'uuid'; +import { logger } from '@/utils/logger'; import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import type { IServiceCreateEventPayload } from '@openpanel/db'; import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; -import { createLogger } from '@openpanel/logger'; import { findJobByPrefix, sessionsQueue } from '@openpanel/queue'; import type { EventsQueuePayloadCreateSessionEnd, @@ -19,10 +19,6 @@ import { getRedisQueue } from '@openpanel/redis'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; export const SESSION_TIMEOUT = 1000 * 60 * 30; -const logger = createLogger({ - name: 'job:incoming-event', -}); - const getSessionEndJobId = (projectId: string, deviceId: string) => `sessionEnd:${projectId}:${deviceId}`; diff --git a/packages/logger/index.ts b/packages/logger/index.ts index b25fbaa2..87ae9bcd 100644 --- a/packages/logger/index.ts +++ b/packages/logger/index.ts @@ -3,9 +3,7 @@ import winston from 'winston'; export { winston }; -export type ILogger = winston.Logger & { - noop: (message: string) => (error: unknown) => void; -}; +export type ILogger = winston.Logger; const logLevel = process.env.LOG_LEVEL ?? 'info'; @@ -61,8 +59,5 @@ export function createLogger({ name }: { name: string }): ILogger { ), }); - return Object.assign(logger, { - noop: (message: string) => (error: unknown) => - logger.error(`noop: ${message}`, error), - }); + return logger; }