diff --git a/apps/api/scripts/mock-minimal.json b/apps/api/scripts/mock-minimal.json new file mode 100644 index 00000000..6eb5c6e2 --- /dev/null +++ b/apps/api/scripts/mock-minimal.json @@ -0,0 +1,49 @@ +[ + { + "headers": { + "openpanel-client-id": "5b679c47-9ec0-470a-8944-a9ab8f42b14f", + "x-client-ip": "221.145.77.175", + "user-agent": "Opera/13.66 (Macintosh; Intel Mac OS X 10.8.3 U; GV Presto/2.9.183 Version/11.00)", + "origin": "https://classic-hovel.info" + }, + "track": { + "type": "track", + "payload": { + "name": "click_button" + } + } + }, + { + "headers": { + "openpanel-client-id": "5b679c47-9ec0-470a-8944-a9ab8f42b14f", + "x-client-ip": "221.145.77.175", + "user-agent": "Opera/13.66 (Macintosh; Intel Mac OS X 10.8.3 U; GV Presto/2.9.183 Version/11.00)", + "origin": "https://classic-hovel.info" + }, + "track": { + "type": "track", + "payload": { + "name": "click_button_2" + } + } + }, + { + "headers": { + "openpanel-client-id": "5b679c47-9ec0-470a-8944-a9ab8f42b14f", + "x-client-ip": "221.145.77.175", + "user-agent": "Opera/13.66 (Macintosh; Intel Mac OS X 10.8.3 U; GV Presto/2.9.183 Version/11.00)", + "origin": "https://classic-hovel.info" + }, + "track": { + "type": "track", + "payload": { + "name": "screen_view", + "properties": { + "__referrer": "https://www.google.com", + "__path": "https://classic-hovel.info/beneficium-arcesso-quisquam", + "__title": "Hic thesis laboriosam copiose admoveo sufficio." + } + } + } + } +] diff --git a/apps/api/scripts/mock.ts b/apps/api/scripts/mock.ts index 248a592b..38bfc18e 100644 --- a/apps/api/scripts/mock.ts +++ b/apps/api/scripts/mock.ts @@ -140,8 +140,9 @@ function scrambleEvents(events: Event[]) { return events.sort(() => Math.random() - 0.5); } -// Distribute events over 6 minutes -const SIX_MINUTES_MS = 3 * 60 * 1000; +// Distribute events over X minutes +const MINUTES = 3; +const SIX_MINUTES_MS = 1000 * 60 * MINUTES; const startTime = Date.now(); let lastTriggeredIndex = 0; @@ -155,13 +156,14 @@ async function triggerEvents(file: string) { return; } - const eventsToTrigger = Math.floor( - generatedEvents.length * (elapsedTime / SIX_MINUTES_MS), + const eventsToTrigger = Math.min( + Math.ceil(generatedEvents.length * (elapsedTime / SIX_MINUTES_MS)), + generatedEvents.length, ); // Send events that haven't been triggered yet for (let i = lastTriggeredIndex; i < eventsToTrigger; i++) { - console.log('asbout to send'); + console.log('about to send'); const event = generatedEvents[i]!; try { @@ -171,7 +173,7 @@ async function triggerEvents(file: string) { console.error(`Failed to send event ${i + 1}:`, error); } console.log( - `sending ${event.track.payload.properties.__path} from user ${event.headers['user-agent']}`, + `sending ${event.track.payload?.properties?.__path} from user ${event.headers['user-agent']}`, ); } @@ -184,6 +186,8 @@ async function triggerEvents(file: string) { if (remainingEvents > 0) { setTimeout(() => triggerEvents(file), 50); // Check every 50ms + } else { + console.log('All events triggered.'); } console.log(`Total events to trigger: ${generatedEvents.length}`); diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index b3189fcb..1b82e418 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -43,8 +43,8 @@ export async function postEvent( const locked = await getRedisCache().set( `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, 'locked', - 'EX', - 5, + 'PX', + 950, // a bit under the delay below 'NX', ); @@ -72,7 +72,7 @@ export async function postEvent( // Prioritize 'screen_view' events by setting no delay // This ensures that session starts are created from 'screen_view' events // rather than other events, maintaining accurate session tracking - delay: request.body.name === 'screen_view' ? 0 : 1000, + delay: request.body.name === 'screen_view' ? undefined : 1000, }, ); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index f73a0074..e326cf06 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -201,8 +201,8 @@ async function track({ const locked = await getRedisCache().set( `request:priority:${currentDeviceId}-${previousDeviceId}:${isScreenView ? 'screen_view' : 'other'}`, 'locked', - 'EX', - 5, + 'PX', + 950, // a bit under the delay below 'NX', ); @@ -228,7 +228,7 @@ async function track({ // Prioritize 'screen_view' events by setting no delay // This ensures that session starts are created from 'screen_view' events // rather than other events, maintaining accurate session tracking - delay: payload.name === 'screen_view' ? 0 : 1000, + delay: payload.name === 'screen_view' ? undefined : 1000, }, ); } diff --git a/apps/api/src/utils/parseIp.ts b/apps/api/src/utils/parseIp.ts index 44b01af1..81e2e6e4 100644 --- a/apps/api/src/utils/parseIp.ts +++ b/apps/api/src/utils/parseIp.ts @@ -39,17 +39,22 @@ export async function parseIp(ip?: string): Promise { } try { - const geo = await fetch(`${process.env.GEO_IP_HOST}/${ip}`, { + const res = await fetch(`${process.env.GEO_IP_HOST}/${ip}`, { signal: AbortSignal.timeout(2000), }); - const res = (await geo.json()) as RemoteIpLookupResponse; + + if (!res.ok) { + return geo; + } + + const json = (await res.json()) as RemoteIpLookupResponse; return { - country: res.country, - city: res.city, - region: res.stateprov, - longitude: res.longitude, - latitude: res.latitude, + country: json.country, + city: json.city, + region: json.stateprov, + longitude: json.longitude, + latitude: json.latitude, }; } catch (error) { logger.error('Failed to fetch geo location for ip', { error }); diff --git a/apps/api/src/utils/parseUserAgent.ts b/apps/api/src/utils/parseUserAgent.ts index b7e8666d..42ddc8bb 100644 --- a/apps/api/src/utils/parseUserAgent.ts +++ b/apps/api/src/utils/parseUserAgent.ts @@ -9,7 +9,9 @@ export function parseUserAgent(ua?: string | null) { if (!ua) return parsedServerUa; const res = new UAParser(ua).getResult(); - if (isServer(ua)) return parsedServerUa; + if (isServer(ua)) { + return parsedServerUa; + } return { os: res.os.name, @@ -77,7 +79,9 @@ function isServer(userAgent: string) { return true; } - return !!userAgent.match(/^([^\s]+\/[\d.]+\s*)+$/); + // Matches user agents like "Go-http-client/1.0" or "Go Http Client/1.0" + // It should just match the first name (with optional spaces) and version + return !!userAgent.match(/^[^\/]+\/[\d.]+$/); } export function getDevice(ua: string) { diff --git a/apps/worker/src/jobs/events.create-session-end.ts b/apps/worker/src/jobs/events.create-session-end.ts index ee1b5f7d..8fa4ca2f 100644 --- a/apps/worker/src/jobs/events.create-session-end.ts +++ b/apps/worker/src/jobs/events.create-session-end.ts @@ -8,11 +8,19 @@ import { eventBuffer, getEvents, } from '@openpanel/db'; +import { createLogger } from '@openpanel/logger'; import type { EventsQueuePayloadCreateSessionEnd } from '@openpanel/queue'; export async function createSessionEnd( job: Job, ) { + const logger = createLogger({ + name: 'job:create-session-end', + }).child({ + payload: job.data.payload, + jobId: job.id, + }); + const payload = job.data.payload; const eventsInBuffer = await eventBuffer.findMany( (item) => item.session_id === payload.sessionId, @@ -71,7 +79,7 @@ export async function createSessionEnd( throw new Error('Could not found session_start or any screen_view'); } - job.log('Creating session_start since it was not found'); + logger.warn('Creating session_start since it was not found'); sessionStart = { ...firstScreenView, diff --git a/apps/worker/src/jobs/events.ts b/apps/worker/src/jobs/events.ts index 549b77bc..40fac0c1 100644 --- a/apps/worker/src/jobs/events.ts +++ b/apps/worker/src/jobs/events.ts @@ -8,6 +8,7 @@ import type { EventsQueuePayloadIncomingEvent, } from '@openpanel/queue'; +import { cacheable } from '@openpanel/redis'; import { createSessionEnd } from './events.create-session-end'; import { incomingEvent } from './events.incoming-event'; @@ -30,11 +31,17 @@ export async function eventsJob(job: Job) { } } -async function updateEventsCount(projectId: string) { +const getProjectEventsCount = cacheable(async function getProjectEventsCount( + projectId: string, +) { const res = await chQuery<{ count: number }>( `SELECT count(*) as count FROM ${TABLE_NAMES.events} WHERE project_id = ${escape(projectId)}`, ); - const count = res[0]?.count; + return res[0]?.count; +}, 60 * 60); + +async function updateEventsCount(projectId: string) { + const count = await getProjectEventsCount(projectId); if (count) { await db.project.update({ where: { diff --git a/packages/db/src/buffers/buffer.ts b/packages/db/src/buffers/buffer.ts index 2035223c..2de11423 100644 --- a/packages/db/src/buffers/buffer.ts +++ b/packages/db/src/buffers/buffer.ts @@ -1,8 +1,10 @@ import { v4 as uuidv4 } from 'uuid'; +import { getSafeJson } from '@openpanel/common'; import type { ILogger } from '@openpanel/logger'; import { createLogger } from '@openpanel/logger'; import { getRedisCache } from '@openpanel/redis'; +import { pathOr } from 'ramda'; export type Find = ( callback: (item: T) => boolean, @@ -42,7 +44,9 @@ export class RedisBuffer { await getRedisCache().rpush(this.getKey(), JSON.stringify(item)); const bufferSize = await getRedisCache().llen(this.getKey()); - this.logger.debug(`Item added. Current size: ${bufferSize}`); + this.logger.debug( + `Item added (${pathOr('unknown', ['id'], item)}) Current size: ${bufferSize}`, + ); if (this.maxBufferSize && bufferSize >= this.maxBufferSize) { await this.tryFlush(); @@ -66,6 +70,8 @@ export class RedisBuffer { this.logger.debug('Lock acquired. Attempting to flush.'); try { await this.flush(); + } catch (error) { + this.logger.error('Failed to flush buffer', { error }); } finally { await this.releaseLock(lockId); } @@ -117,18 +123,26 @@ export class RedisBuffer { .exec(); if (!result) { + this.logger.error('No result from redis transaction', { + result, + }); throw new Error('Redis transaction failed'); } const lrange = result[0]; if (!lrange || lrange[0] instanceof Error) { + this.logger.error('Error from lrange', { + result, + }); throw new Error('Redis transaction failed'); } const items = lrange[1] as string[]; - const parsedItems = items.map((item) => JSON.parse(item) as T); + const parsedItems = items + .map((item) => getSafeJson(item) as T | null) + .filter((item): item is T => item !== null); if (parsedItems.length === 0) { this.logger.debug('No items to flush'); @@ -163,15 +177,15 @@ export class RedisBuffer { } catch (error) { this.logger.error('Failed to process queue while flushing buffer', { error, - queueSize: items.length, + queueSize: parsedItems.length, }); - if (items.length > 0) { + if (parsedItems.length > 0) { // Add back items to keep this.logger.info('Adding all items back to buffer'); await getRedisCache().lpush( this.getKey(), - ...items.map((item) => JSON.stringify(item)), + ...parsedItems.map((item) => JSON.stringify(item)), ); } } @@ -190,8 +204,15 @@ export class RedisBuffer { } protected async getQueue(count?: number): Promise { - const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1); - return items.map((item) => JSON.parse(item) as T); + try { + const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1); + return items + .map((item) => getSafeJson(item) as T | null) + .filter((item): item is T => item !== null); + } catch (error) { + this.logger.error('Failed to get queue', { error }); + return []; + } } protected processItems(items: T[]): Promise<{ toInsert: T[]; toKeep: T[] }> { diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 3920c589..4b836793 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -4,7 +4,11 @@ import SuperJSON from 'superjson'; import { deepMergeObjects } from '@openpanel/common'; import { getRedisCache, getRedisPub } from '@openpanel/redis'; -import { TABLE_NAMES, ch } from '../clickhouse-client'; +import { + TABLE_NAMES, + ch, + convertClickhouseDateToJs, +} from '../clickhouse-client'; import { transformEvent } from '../services/event.service'; import type { IClickhouseEvent, @@ -13,6 +17,8 @@ import type { import type { Find, FindMany } from './buffer'; import { RedisBuffer } from './buffer'; +const STALLED_QUEUE_TIMEOUT = 1000 * 60 * 60 * 24; + type BufferType = IClickhouseEvent; export class EventBuffer extends RedisBuffer { constructor() { @@ -46,8 +52,8 @@ export class EventBuffer extends RedisBuffer { protected async processItems( queue: BufferType[], ): Promise<{ toInsert: BufferType[]; toKeep: BufferType[] }> { - const toInsert = new Set(); - const itemsToStalled = new Set(); + const toInsert: BufferType[] = []; + const toStalled: BufferType[] = []; // Sort data by created_at // oldest first @@ -85,7 +91,7 @@ export class EventBuffer extends RedisBuffer { event.properties.__properties_from = lastEventWithData.id; } - return toInsert.add(event); + return toInsert.push(event); }); // Group screen_view events by session_id @@ -125,10 +131,10 @@ export class EventBuffer extends RedisBuffer { }, duration, }; - toInsert.add(event); + toInsert.push(event); } else if (hasSessionEnd) { // push last event in session if we have a session_end event - toInsert.add(item); + toInsert.push(item); } }); } // for of end @@ -137,28 +143,37 @@ export class EventBuffer extends RedisBuffer { // This should not theoretically happen but if it does we should move them to stalled queue.forEach((item) => { if ( - !toInsert.has(item) && - new Date(item.created_at).getTime() < - new Date().getTime() - 1000 * 60 * 60 * 24 + !toInsert.find((i) => i.id === item.id) && + convertClickhouseDateToJs(item.created_at).getTime() < + new Date().getTime() - STALLED_QUEUE_TIMEOUT ) { - itemsToStalled.add(item); + toStalled.push(item); } }); - if (itemsToStalled.size > 0) { - const multi = getRedisCache().multi(); - for (const item of itemsToStalled) { - multi.rpush(this.getKey('stalled'), JSON.stringify(item)); + if (toStalled.length > 0) { + try { + this.logger.info(`Pushing to stalled queue (${toStalled.length})`, { + items: toStalled, + count: toStalled.length, + }); + await getRedisCache().rpush( + this.getKey('stalled'), + ...toStalled.map((item) => JSON.stringify(item)), + ); + } catch (error) { + toStalled.length = 0; + this.logger.error('Failed to push to stalled queue', { error }); } - await multi.exec(); } - const toInsertArray = Array.from(toInsert); return { - toInsert: toInsertArray, - toKeep: queue.filter( - (item) => !toInsertArray.find((i) => i.id === item.id), - ), + toInsert, + toKeep: queue.filter((item) => { + const willBeInserted = toInsert.find((i) => i.id === item.id); + const willBeStalled = toStalled.find((i) => i.id === item.id); + return willBeInserted === undefined && willBeStalled === undefined; + }), }; } diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index b0992b3f..5d15dc5f 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -276,9 +276,6 @@ export async function createEvent(payload: IServiceCreateEventPayload) { if (!payload.profileId) { payload.profileId = payload.deviceId; } - console.log( - `create event ${payload.name} for [deviceId]: ${payload.deviceId} [profileId]: ${payload.profileId} [projectId]: ${payload.projectId} [path]: ${payload.path}`, - ); if (payload.profileId !== '') { await upsertProfile({