fix(worker): add reqId to logger for better traceability

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-02-19 10:33:42 +01:00
parent c4258bbccd
commit 59012526e2
5 changed files with 25 additions and 5 deletions

View File

@@ -29,6 +29,7 @@ export function getStringHeaders(headers: FastifyRequest['headers']) {
'openpanel-sdk-name', 'openpanel-sdk-name',
'openpanel-sdk-version', 'openpanel-sdk-version',
'openpanel-client-id', 'openpanel-client-id',
'request-id',
], ],
headers, headers,
), ),

View File

@@ -40,7 +40,12 @@ export async function parseIp(ip?: string): Promise<GeoLocation> {
} }
const hash = crypto.createHash('sha256').update(ip).digest('hex'); const hash = crypto.createHash('sha256').update(ip).digest('hex');
const cached = await getRedisCache().get(`geo:${hash}`); const cached = await getRedisCache()
.get(`geo:${hash}`)
.catch(() => {
logger.warn('Failed to get geo location from cache', { hash });
return null;
});
if (cached) { if (cached) {
return JSON.parse(cached); return JSON.parse(cached);
@@ -69,7 +74,7 @@ export async function parseIp(ip?: string): Promise<GeoLocation> {
`geo:${hash}`, `geo:${hash}`,
JSON.stringify(geo), JSON.stringify(geo),
'EX', 'EX',
60 * 30, 60 * 60 * 24,
); );
return geo; return geo;

View File

@@ -72,6 +72,7 @@ export async function createSessionEnd(
const logger = baseLogger.child({ const logger = baseLogger.child({
payload: job.data.payload, payload: job.data.payload,
jobId: job.id, jobId: job.id,
reqId: job.data.payload.properties?.__reqId ?? 'unknown',
}); });
const payload = job.data.payload; const payload = job.data.payload;

View File

@@ -2,7 +2,7 @@ import { getReferrerWithQuery, parseReferrer } from '@/utils/parse-referrer';
import type { Job } from 'bullmq'; import type { Job } from 'bullmq';
import { omit } from 'ramda'; import { omit } from 'ramda';
import { logger } from '@/utils/logger'; import { logger as baseLogger } from '@/utils/logger';
import { createSessionEnd, getSessionEnd } from '@/utils/session-handler'; import { createSessionEnd, getSessionEnd } from '@/utils/session-handler';
import { isSameDomain, parsePath } from '@openpanel/common'; import { isSameDomain, parsePath } from '@openpanel/common';
import { parseUserAgent } from '@openpanel/common/server'; import { parseUserAgent } from '@openpanel/common/server';
@@ -12,6 +12,7 @@ import {
createEvent, createEvent,
eventBuffer, eventBuffer,
} from '@openpanel/db'; } from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import * as R from 'ramda'; import * as R from 'ramda';
@@ -26,6 +27,7 @@ const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
async function createEventAndNotify( async function createEventAndNotify(
payload: IServiceCreateEventPayload, payload: IServiceCreateEventPayload,
jobData: Job<EventsQueuePayloadIncomingEvent>['data']['payload'], jobData: Job<EventsQueuePayloadIncomingEvent>['data']['payload'],
logger: ILogger,
) { ) {
await checkNotificationRulesForEvent(payload).catch((e) => { await checkNotificationRulesForEvent(payload).catch((e) => {
logger.error('Error checking notification rules', { error: e }); logger.error('Error checking notification rules', { error: e });
@@ -47,6 +49,10 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
priority, priority,
} = job.data.payload; } = job.data.payload;
const properties = body.properties ?? {}; const properties = body.properties ?? {};
const reqId = headers['request-id'] ?? 'unknown';
const logger = baseLogger.child({
reqId,
});
const getProperty = (name: string): string | undefined => { const getProperty = (name: string): string | undefined => {
// replace thing is just for older sdks when we didn't have `__` // replace thing is just for older sdks when we didn't have `__`
// remove when kiddokitchen app (24.09.02) is not used anymore // remove when kiddokitchen app (24.09.02) is not used anymore
@@ -82,6 +88,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
__user_agent: userAgent, __user_agent: userAgent,
__hash: hash, __hash: hash,
__query: query, __query: query,
__reqId: reqId,
}), }),
createdAt, createdAt,
duration: 0, duration: 0,
@@ -116,7 +123,11 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
: null; : null;
const payload = merge(omit(['properties'], event ?? {}), baseEvent); const payload = merge(omit(['properties'], event ?? {}), baseEvent);
return createEventAndNotify(payload as IServiceEvent, job.data.payload); return createEventAndNotify(
payload as IServiceEvent,
job.data.payload,
logger,
);
} }
const sessionEnd = await getSessionEnd({ const sessionEnd = await getSessionEnd({
@@ -147,5 +158,5 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
await createSessionEnd({ payload }); await createSessionEnd({ payload });
} }
return createEventAndNotify(payload, job.data.payload); return createEventAndNotify(payload, job.data.payload, logger);
} }

View File

@@ -143,6 +143,8 @@ export interface IServiceEvent {
properties: Record<string, unknown> & { properties: Record<string, unknown> & {
hash?: string; hash?: string;
query?: Record<string, unknown>; query?: Record<string, unknown>;
__reqId?: string;
__user_agent?: string;
}; };
createdAt: Date; createdAt: Date;
country?: string | undefined; country?: string | undefined;