From 067285797451b9fe6662c950c17b7368dbe4e85f Mon Sep 17 00:00:00 2001 From: zias Date: Mon, 30 Mar 2026 12:04:04 +0200 Subject: [PATCH] feat: add OpenTelemetry device log capture pipeline - ClickHouse `logs` table (migration 13) with OTel columns, bloom filter indices - Zod validation schema for log payloads (severity, body, attributes, trace context) - Redis-backed LogBuffer with micro-batching into ClickHouse - POST /logs API endpoint with client auth, geo + UA enrichment - BullMQ logs queue + worker job - cron flushLogs every 10s wired into existing cron system - SDK captureLog(severity, body, properties) with client-side batching Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/controllers/logs.controller.ts | 68 +++++++ apps/api/src/index.ts | 2 + apps/api/src/routes/logs.router.ts | 17 ++ apps/worker/src/boot-cron.ts | 5 + apps/worker/src/boot-workers.ts | 17 ++ apps/worker/src/jobs/cron.ts | 5 +- apps/worker/src/jobs/logs.incoming-log.ts | 63 +++++++ packages/db/code-migrations/13-add-logs.ts | 72 ++++++++ packages/db/src/buffers/index.ts | 3 + packages/db/src/buffers/log-buffer.ts | 193 ++++++++++++++++++++ packages/queue/src/queues.ts | 54 +++++- packages/sdks/sdk/src/index.ts | 94 ++++++++++ packages/validation/src/index.ts | 1 + packages/validation/src/log.validation.ts | 60 ++++++ 14 files changed, 652 insertions(+), 2 deletions(-) create mode 100644 apps/api/src/controllers/logs.controller.ts create mode 100644 apps/api/src/routes/logs.router.ts create mode 100644 apps/worker/src/jobs/logs.incoming-log.ts create mode 100644 packages/db/code-migrations/13-add-logs.ts create mode 100644 packages/db/src/buffers/log-buffer.ts create mode 100644 packages/validation/src/log.validation.ts diff --git a/apps/api/src/controllers/logs.controller.ts b/apps/api/src/controllers/logs.controller.ts new file mode 100644 index 00000000..95ea15ff --- /dev/null +++ b/apps/api/src/controllers/logs.controller.ts @@ -0,0 +1,68 @@ +import { parseUserAgent } from '@openpanel/common/server'; +import { getSalts } from '@openpanel/db'; +import { getGeoLocation } from '@openpanel/geo'; +import { type LogsQueuePayload, logsQueue } from '@openpanel/queue'; +import { type ILogBatchPayload, zLogBatchPayload } from '@openpanel/validation'; +import type { FastifyReply, FastifyRequest } from 'fastify'; +import { getDeviceId } from '@/utils/ids'; +import { getStringHeaders } from './track.controller'; + +export async function handler( + request: FastifyRequest<{ Body: ILogBatchPayload }>, + reply: FastifyReply, +) { + const projectId = request.client?.projectId; + if (!projectId) { + return reply.status(400).send({ status: 400, error: 'Missing projectId' }); + } + + const validationResult = zLogBatchPayload.safeParse(request.body); + if (!validationResult.success) { + return reply.status(400).send({ + status: 400, + error: 'Bad Request', + message: 'Validation failed', + errors: validationResult.error.errors, + }); + } + + const { logs } = validationResult.data; + + const ip = request.clientIp; + const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + const headers = getStringHeaders(request.headers); + const receivedAt = new Date().toISOString(); + + const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); + const { deviceId, sessionId } = await getDeviceId({ projectId, ip, ua, salts }); + const uaInfo = parseUserAgent(ua, undefined); + + const jobs: LogsQueuePayload[] = logs.map((log) => ({ + type: 'incomingLog' as const, + payload: { + projectId, + log: { + ...log, + timestamp: log.timestamp ?? receivedAt, + }, + uaInfo, + geo: { + country: geo.country, + city: geo.city, + region: geo.region, + }, + headers, + deviceId, + sessionId, + }, + })); + + await logsQueue.addBulk( + jobs.map((job) => ({ + name: 'incomingLog', + data: job, + })), + ); + + return reply.status(200).send({ ok: true, count: logs.length }); +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cb5cdece..0502484d 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -44,6 +44,7 @@ import manageRouter from './routes/manage.router'; import miscRouter from './routes/misc.router'; import oauthRouter from './routes/oauth-callback.router'; import profileRouter from './routes/profile.router'; +import logsRouter from './routes/logs.router'; import trackRouter from './routes/track.router'; import webhookRouter from './routes/webhook.router'; import { HttpError } from './utils/errors'; @@ -209,6 +210,7 @@ const startServer = async () => { instance.register(importRouter, { prefix: '/import' }); instance.register(insightsRouter, { prefix: '/insights' }); instance.register(trackRouter, { prefix: '/track' }); + instance.register(logsRouter, { prefix: '/logs' }); instance.register(manageRouter, { prefix: '/manage' }); // Keep existing endpoints for backward compatibility instance.get('/healthcheck', healthcheck); diff --git a/apps/api/src/routes/logs.router.ts b/apps/api/src/routes/logs.router.ts new file mode 100644 index 00000000..35938b82 --- /dev/null +++ b/apps/api/src/routes/logs.router.ts @@ -0,0 +1,17 @@ +import type { FastifyPluginCallback } from 'fastify'; +import { handler } from '@/controllers/logs.controller'; +import { clientHook } from '@/hooks/client.hook'; +import { duplicateHook } from '@/hooks/duplicate.hook'; + +const logsRouter: FastifyPluginCallback = async (fastify) => { + fastify.addHook('preValidation', duplicateHook); + fastify.addHook('preHandler', clientHook); + + fastify.route({ + method: 'POST', + url: '/', + handler, + }); +}; + +export default logsRouter; diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index f49a1435..15086b59 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -73,6 +73,11 @@ export async function bootCron() { type: 'flushGroups', pattern: 1000 * 10, }, + { + name: 'flush', + type: 'flushLogs', + pattern: 1000 * 10, + }, { name: 'insightsDaily', type: 'insightsDaily', diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index f6395565..1cd5f6f0 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -8,6 +8,7 @@ import { gscQueue, importQueue, insightsQueue, + logsQueue, miscQueue, notificationQueue, queueLogger, @@ -22,6 +23,7 @@ import { incomingEvent } from './jobs/events.incoming-event'; import { gscJob } from './jobs/gsc'; import { importJob } from './jobs/import'; import { insightsProjectJob } from './jobs/insights'; +import { incomingLog } from './jobs/logs.incoming-log'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; @@ -59,6 +61,7 @@ function getEnabledQueues(): QueueName[] { 'import', 'insights', 'gsc', + 'logs', ]; } @@ -221,6 +224,20 @@ export function bootWorkers() { logger.info('Started worker for gsc', { concurrency }); } + // Start logs worker + if (enabledQueues.includes('logs')) { + const concurrency = getConcurrencyFor('logs', 10); + const logsWorker = new Worker( + logsQueue.name, + async (job) => { + await incomingLog(job.data.payload); + }, + { ...workerOptions, concurrency }, + ); + workers.push(logsWorker); + logger.info('Started worker for logs', { concurrency }); + } + if (workers.length === 0) { logger.warn( 'No workers started. Check ENABLED_QUEUES environment variable.' diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index c6614d15..d30553c0 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -1,6 +1,6 @@ import type { Job } from 'bullmq'; -import { eventBuffer, groupBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db'; +import { eventBuffer, groupBuffer, logBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db'; import type { CronQueuePayload } from '@openpanel/queue'; import { jobdeleteProjects } from './cron.delete-projects'; @@ -33,6 +33,9 @@ export async function cronJob(job: Job) { case 'flushGroups': { return await groupBuffer.tryFlush(); } + case 'flushLogs': { + return await logBuffer.tryFlush(); + } case 'ping': { return await ping(); } diff --git a/apps/worker/src/jobs/logs.incoming-log.ts b/apps/worker/src/jobs/logs.incoming-log.ts new file mode 100644 index 00000000..83b9624e --- /dev/null +++ b/apps/worker/src/jobs/logs.incoming-log.ts @@ -0,0 +1,63 @@ +import type { IClickhouseLog } from '@openpanel/db'; +import { logBuffer } from '@openpanel/db'; +import type { LogsQueuePayload } from '@openpanel/queue'; +import { SEVERITY_TEXT_TO_NUMBER } from '@openpanel/validation'; +import { logger as baseLogger } from '@/utils/logger'; + +export async function incomingLog( + payload: LogsQueuePayload['payload'], +): Promise { + const logger = baseLogger.child({ projectId: payload.projectId }); + + try { + const { log, uaInfo, geo, deviceId, sessionId, projectId, headers } = payload; + + const sdkName = headers['openpanel-sdk-name'] ?? ''; + const sdkVersion = headers['openpanel-sdk-version'] ?? ''; + + const severityNumber = + log.severityNumber ?? + SEVERITY_TEXT_TO_NUMBER[log.severity] ?? + 9; // INFO fallback + + const row: IClickhouseLog = { + project_id: projectId, + device_id: deviceId, + profile_id: log.profileId ? String(log.profileId) : '', + session_id: sessionId, + timestamp: log.timestamp, + observed_at: new Date().toISOString(), + severity_number: severityNumber, + severity_text: log.severity, + body: log.body, + trace_id: log.traceId ?? '', + span_id: log.spanId ?? '', + trace_flags: log.traceFlags ?? 0, + logger_name: log.loggerName ?? '', + attributes: log.attributes ?? {}, + resource: log.resource ?? {}, + sdk_name: sdkName, + sdk_version: sdkVersion, + country: geo.country ?? '', + city: geo.city ?? '', + region: geo.region ?? '', + os: uaInfo.os ?? '', + os_version: uaInfo.osVersion ?? '', + browser: uaInfo.isServer ? '' : (uaInfo.browser ?? ''), + browser_version: uaInfo.isServer ? '' : (uaInfo.browserVersion ?? ''), + device: uaInfo.device ?? '', + brand: uaInfo.isServer ? '' : (uaInfo.brand ?? ''), + model: uaInfo.isServer ? '' : (uaInfo.model ?? ''), + }; + + logBuffer.add(row); + + logger.info('Log queued', { + severity: log.severity, + loggerName: log.loggerName, + }); + } catch (error) { + logger.error('Failed to process incoming log', { error }); + throw error; + } +} diff --git a/packages/db/code-migrations/13-add-logs.ts b/packages/db/code-migrations/13-add-logs.ts new file mode 100644 index 00000000..3fdbab6c --- /dev/null +++ b/packages/db/code-migrations/13-add-logs.ts @@ -0,0 +1,72 @@ +import { createTable, runClickhouseMigrationCommands } from '../src/clickhouse/migration'; +import { getIsCluster, getIsSelfHosting, printBoxMessage } from './helpers'; + +export async function up() { + const replicatedVersion = '1'; + const isClustered = getIsCluster(); + + const sqls: string[] = []; + + sqls.push( + ...createTable({ + name: 'logs', + columns: [ + '`id` UUID DEFAULT generateUUIDv4()', + '`project_id` String CODEC(ZSTD(3))', + '`device_id` String CODEC(ZSTD(3))', + '`profile_id` String CODEC(ZSTD(3))', + '`session_id` String CODEC(LZ4)', + // OpenTelemetry log fields + '`timestamp` DateTime64(9) CODEC(DoubleDelta, ZSTD(3))', + '`observed_at` DateTime64(9) CODEC(DoubleDelta, ZSTD(3))', + '`severity_number` UInt8', + '`severity_text` LowCardinality(String)', + '`body` String CODEC(ZSTD(3))', + '`trace_id` String CODEC(ZSTD(3))', + '`span_id` String CODEC(ZSTD(3))', + '`trace_flags` UInt32 DEFAULT 0', + '`logger_name` LowCardinality(String)', + // OTel attributes (log-level key-value pairs) + '`attributes` Map(String, String) CODEC(ZSTD(3))', + // OTel resource attributes (device/app metadata) + '`resource` Map(String, String) CODEC(ZSTD(3))', + // Server-enriched context + '`sdk_name` LowCardinality(String)', + '`sdk_version` LowCardinality(String)', + '`country` LowCardinality(FixedString(2))', + '`city` String', + '`region` LowCardinality(String)', + '`os` LowCardinality(String)', + '`os_version` LowCardinality(String)', + '`browser` LowCardinality(String)', + '`browser_version` LowCardinality(String)', + '`device` LowCardinality(String)', + '`brand` LowCardinality(String)', + '`model` LowCardinality(String)', + ], + indices: [ + 'INDEX idx_severity_number severity_number TYPE minmax GRANULARITY 1', + 'INDEX idx_body body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1', + 'INDEX idx_trace_id trace_id TYPE bloom_filter GRANULARITY 1', + 'INDEX idx_logger_name logger_name TYPE bloom_filter GRANULARITY 1', + ], + orderBy: ['project_id', 'toDate(timestamp)', 'severity_number', 'device_id'], + partitionBy: 'toYYYYMM(timestamp)', + settings: { + index_granularity: 8192, + ttl_only_drop_parts: 1, + }, + distributionHash: 'cityHash64(project_id, toString(toStartOfHour(timestamp)))', + replicatedVersion, + isClustered, + }), + ); + + printBoxMessage('Running migration: 13-add-logs', [ + 'Creates the logs table for OpenTelemetry-compatible device/app log capture.', + ]); + + if (!process.argv.includes('--dry')) { + await runClickhouseMigrationCommands(sqls); + } +} diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 86741b54..310ad3a5 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,6 +1,7 @@ import { BotBuffer as BotBufferRedis } from './bot-buffer'; import { EventBuffer as EventBufferRedis } from './event-buffer'; import { GroupBuffer } from './group-buffer'; +import { LogBuffer } from './log-buffer'; import { ProfileBackfillBuffer } from './profile-backfill-buffer'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; import { ReplayBuffer } from './replay-buffer'; @@ -13,6 +14,8 @@ export const sessionBuffer = new SessionBuffer(); export const profileBackfillBuffer = new ProfileBackfillBuffer(); export const replayBuffer = new ReplayBuffer(); export const groupBuffer = new GroupBuffer(); +export const logBuffer = new LogBuffer(); export type { ProfileBackfillEntry } from './profile-backfill-buffer'; export type { IClickhouseSessionReplayChunk } from './replay-buffer'; +export type { IClickhouseLog } from './log-buffer'; diff --git a/packages/db/src/buffers/log-buffer.ts b/packages/db/src/buffers/log-buffer.ts new file mode 100644 index 00000000..830af302 --- /dev/null +++ b/packages/db/src/buffers/log-buffer.ts @@ -0,0 +1,193 @@ +import { getSafeJson } from '@openpanel/json'; +import { getRedisCache } from '@openpanel/redis'; +import { ch } from '../clickhouse/client'; +import { BaseBuffer } from './base-buffer'; + +export interface IClickhouseLog { + id?: string; + project_id: string; + device_id: string; + profile_id: string; + session_id: string; + timestamp: string; + observed_at: string; + severity_number: number; + severity_text: string; + body: string; + trace_id: string; + span_id: string; + trace_flags: number; + logger_name: string; + attributes: Record; + resource: Record; + sdk_name: string; + sdk_version: string; + country: string; + city: string; + region: string; + os: string; + os_version: string; + browser: string; + browser_version: string; + device: string; + brand: string; + model: string; +} + +export class LogBuffer extends BaseBuffer { + private batchSize = process.env.LOG_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_BATCH_SIZE, 10) + : 4000; + private chunkSize = process.env.LOG_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_CHUNK_SIZE, 10) + : 1000; + private microBatchIntervalMs = process.env.LOG_BUFFER_MICRO_BATCH_MS + ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_MS, 10) + : 10; + private microBatchMaxSize = process.env.LOG_BUFFER_MICRO_BATCH_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_SIZE, 10) + : 100; + + private pendingLogs: IClickhouseLog[] = []; + private flushTimer: ReturnType | null = null; + private isFlushing = false; + private flushRetryCount = 0; + + private queueKey = 'log_buffer:queue'; + protected bufferCounterKey = 'log_buffer:total_count'; + + constructor() { + super({ + name: 'log', + onFlush: async () => { + await this.processBuffer(); + }, + }); + } + + add(log: IClickhouseLog) { + this.pendingLogs.push(log); + + if (this.pendingLogs.length >= this.microBatchMaxSize) { + this.flushLocalBuffer(); + return; + } + + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + + public async flush() { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLocalBuffer(); + } + + private async flushLocalBuffer() { + if (this.isFlushing || this.pendingLogs.length === 0) { + return; + } + + this.isFlushing = true; + const logsToFlush = this.pendingLogs; + this.pendingLogs = []; + + try { + const redis = getRedisCache(); + const multi = redis.multi(); + for (const log of logsToFlush) { + multi.rpush(this.queueKey, JSON.stringify(log)); + } + multi.incrby(this.bufferCounterKey, logsToFlush.length); + await multi.exec(); + this.flushRetryCount = 0; + } catch (error) { + this.pendingLogs = logsToFlush.concat(this.pendingLogs); + this.flushRetryCount += 1; + this.logger.warn('Failed to flush log buffer to Redis; logs re-queued', { + error, + logCount: logsToFlush.length, + flushRetryCount: this.flushRetryCount, + }); + } finally { + this.isFlushing = false; + if (this.pendingLogs.length > 0 && !this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + } + + async processBuffer() { + const redis = getRedisCache(); + + try { + const queueLogs = await redis.lrange(this.queueKey, 0, this.batchSize - 1); + + if (queueLogs.length === 0) { + this.logger.debug('No logs to process'); + return; + } + + const logsToClickhouse: IClickhouseLog[] = []; + for (const logStr of queueLogs) { + const log = getSafeJson(logStr); + if (log) { + logsToClickhouse.push(log); + } + } + + if (logsToClickhouse.length === 0) { + this.logger.debug('No valid logs to process'); + return; + } + + logsToClickhouse.sort( + (a, b) => + new Date(a.timestamp || 0).getTime() - + new Date(b.timestamp || 0).getTime(), + ); + + this.logger.info('Inserting logs into ClickHouse', { + totalLogs: logsToClickhouse.length, + chunks: Math.ceil(logsToClickhouse.length / this.chunkSize), + }); + + for (const chunk of this.chunks(logsToClickhouse, this.chunkSize)) { + await ch.insert({ + table: 'logs', + values: chunk, + format: 'JSONEachRow', + }); + } + + await redis + .multi() + .ltrim(this.queueKey, queueLogs.length, -1) + .decrby(this.bufferCounterKey, queueLogs.length) + .exec(); + + this.logger.info('Processed logs from Redis buffer', { + batchSize: this.batchSize, + logsProcessed: logsToClickhouse.length, + }); + } catch (error) { + this.logger.error('Error processing log Redis buffer', { error }); + } + } + + public getBufferSize() { + return this.getBufferSizeWithCounter(async () => { + const redis = getRedisCache(); + return await redis.llen(this.queueKey); + }); + } +} diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 171e4707..f8395d3e 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -8,7 +8,7 @@ import { createLogger } from '@openpanel/logger'; import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; import { Queue } from 'bullmq'; import { Queue as GroupQueue } from 'groupmq'; -import type { ITrackPayload } from '../../validation'; +import type { ILogPayload, ITrackPayload } from '../../validation'; export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt( process.env.EVENTS_GROUP_QUEUES_SHARDS || '1', @@ -138,6 +138,10 @@ export type CronQueuePayloadFlushGroups = { type: 'flushGroups'; payload: undefined; }; +export type CronQueuePayloadFlushLogs = { + type: 'flushLogs'; + payload: undefined; +}; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents @@ -146,6 +150,7 @@ export type CronQueuePayload = | CronQueuePayloadFlushProfileBackfill | CronQueuePayloadFlushReplay | CronQueuePayloadFlushGroups + | CronQueuePayloadFlushLogs | CronQueuePayloadPing | CronQueuePayloadProject | CronQueuePayloadInsightsDaily @@ -297,3 +302,50 @@ export const gscQueue = new Queue(getQueueName('gsc'), { removeOnFail: 100, }, }); + +export type LogsQueuePayload = { + type: 'incomingLog'; + payload: { + projectId: string; + log: ILogPayload & { + timestamp: string; + }; + uaInfo: + | { + readonly isServer: true; + readonly device: 'server'; + readonly os: ''; + readonly osVersion: ''; + readonly browser: ''; + readonly browserVersion: ''; + readonly brand: ''; + readonly model: ''; + } + | { + readonly os: string | undefined; + readonly osVersion: string | undefined; + readonly browser: string | undefined; + readonly browserVersion: string | undefined; + readonly device: string; + readonly brand: string | undefined; + readonly model: string | undefined; + readonly isServer: false; + }; + geo: { + country: string | undefined; + city: string | undefined; + region: string | undefined; + }; + headers: Record; + deviceId: string; + sessionId: string; + }; +}; + +export const logsQueue = new Queue(getQueueName('logs'), { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 100, + removeOnFail: 1000, + }, +}); diff --git a/packages/sdks/sdk/src/index.ts b/packages/sdks/sdk/src/index.ts index 71789d62..ff601f26 100644 --- a/packages/sdks/sdk/src/index.ts +++ b/packages/sdks/sdk/src/index.ts @@ -7,6 +7,7 @@ import type { IGroupPayload as GroupPayload, IIdentifyPayload as IdentifyPayload, IIncrementPayload as IncrementPayload, + ISeverityText, ITrackHandlerPayload as TrackHandlerPayload, ITrackPayload as TrackPayload, } from '@openpanel/validation'; @@ -19,6 +20,7 @@ export type { GroupPayload, IdentifyPayload, IncrementPayload, + ISeverityText, TrackHandlerPayload, TrackPayload, }; @@ -29,6 +31,33 @@ export interface TrackProperties { groups?: string[]; } +export interface LogProperties { + /** Logger name (e.g. "com.example.MyActivity") */ + loggerName?: string; + traceId?: string; + spanId?: string; + traceFlags?: number; + /** Log-level key-value attributes */ + attributes?: Record; + /** Resource/device attributes */ + resource?: Record; + /** ISO 8601 timestamp; defaults to now */ + timestamp?: string; +} + +interface LogPayloadForQueue { + body: string; + severity: ISeverityText; + loggerName?: string; + traceId?: string; + spanId?: string; + traceFlags?: number; + attributes?: Record; + resource?: Record; + timestamp: string; + profileId?: string; +} + export type UpsertGroupPayload = GroupPayload; export interface OpenPanelOptions { @@ -57,6 +86,10 @@ export class OpenPanel { sessionId?: string; global?: Record; queue: TrackHandlerPayload[] = []; + private logQueue: LogPayloadForQueue[] = []; + private logFlushTimer: ReturnType | null = null; + private logFlushIntervalMs = 2000; + private logFlushMaxSize = 50; constructor(options: OpenPanelOptions) { this.options = options; @@ -327,6 +360,67 @@ export class OpenPanel { this.queue = remaining; } + captureLog( + severity: ISeverityText, + body: string, + properties?: LogProperties, + ) { + if (this.options.disabled) { + return; + } + + const entry: LogPayloadForQueue = { + body, + severity, + timestamp: properties?.timestamp ?? new Date().toISOString(), + ...(this.profileId ? { profileId: this.profileId } : {}), + ...(properties?.loggerName ? { loggerName: properties.loggerName } : {}), + ...(properties?.traceId ? { traceId: properties.traceId } : {}), + ...(properties?.spanId ? { spanId: properties.spanId } : {}), + ...(properties?.traceFlags !== undefined + ? { traceFlags: properties.traceFlags } + : {}), + ...(properties?.attributes ? { attributes: properties.attributes } : {}), + ...(properties?.resource ? { resource: properties.resource } : {}), + }; + + this.logQueue.push(entry); + + if (this.logQueue.length >= this.logFlushMaxSize) { + this.flushLogs(); + return; + } + + if (!this.logFlushTimer) { + this.logFlushTimer = setTimeout(() => { + this.logFlushTimer = null; + this.flushLogs(); + }, this.logFlushIntervalMs); + } + } + + private async flushLogs() { + if (this.logFlushTimer) { + clearTimeout(this.logFlushTimer); + this.logFlushTimer = null; + } + + if (this.logQueue.length === 0) { + return; + } + + const batch = this.logQueue; + this.logQueue = []; + + try { + await this.api.fetch('/logs', { logs: batch }); + } catch (error) { + this.log('Failed to flush logs', error); + // Re-queue on failure + this.logQueue = batch.concat(this.logQueue); + } + } + log(...args: any[]) { if (this.options.debug) { console.log('[OpenPanel.dev]', ...args); diff --git a/packages/validation/src/index.ts b/packages/validation/src/index.ts index 2b27593e..c326aa4c 100644 --- a/packages/validation/src/index.ts +++ b/packages/validation/src/index.ts @@ -626,3 +626,4 @@ export type ICreateImport = z.infer; export * from './types.insights'; export * from './track.validation'; export * from './event-blocklist'; +export * from './log.validation'; diff --git a/packages/validation/src/log.validation.ts b/packages/validation/src/log.validation.ts new file mode 100644 index 00000000..5f25732e --- /dev/null +++ b/packages/validation/src/log.validation.ts @@ -0,0 +1,60 @@ +import { z } from 'zod'; + +/** + * OTel severity number mapping (subset): + * TRACE=1, DEBUG=5, INFO=9, WARN=13, ERROR=17, FATAL=21 + */ +export const SEVERITY_TEXT_TO_NUMBER: Record = { + trace: 1, + debug: 5, + info: 9, + warn: 13, + warning: 13, + error: 17, + fatal: 21, + critical: 21, +}; + +export const zSeverityText = z.enum([ + 'trace', + 'debug', + 'info', + 'warn', + 'warning', + 'error', + 'fatal', + 'critical', +]); + +export type ISeverityText = z.infer; + +export const zLogPayload = z.object({ + /** Log message / body */ + body: z.string().min(1), + /** Severity level as text */ + severity: zSeverityText.default('info'), + /** Optional override for the numeric OTel severity (1-24) */ + severityNumber: z.number().int().min(1).max(24).optional(), + /** ISO 8601 timestamp; defaults to server receive time if omitted */ + timestamp: z.string().datetime({ offset: true }).optional(), + /** Logger name (e.g. "com.example.MyActivity") */ + loggerName: z.string().optional(), + /** W3C trace context */ + traceId: z.string().optional(), + spanId: z.string().optional(), + traceFlags: z.number().int().min(0).optional(), + /** Log-level key-value attributes */ + attributes: z.record(z.string(), z.string()).optional(), + /** Resource/device attributes (app version, runtime, etc.) */ + resource: z.record(z.string(), z.string()).optional(), + /** Profile/user ID to associate with this log */ + profileId: z.union([z.string().min(1), z.number()]).optional(), +}); + +export type ILogPayload = z.infer; + +export const zLogBatchPayload = z.object({ + logs: z.array(zLogPayload).min(1).max(500), +}); + +export type ILogBatchPayload = z.infer;