From 655ea1f87e601726826728db8dcdd69629d4e455 Mon Sep 17 00:00:00 2001 From: zias Date: Tue, 31 Mar 2026 16:45:05 +0200 Subject: [PATCH] feat:add otel logging --- apps/api/src/controllers/logs.controller.ts | 68 ++++ apps/api/src/index.ts | 2 + apps/api/src/routes/logs.router.ts | 6 + .../src/components/sidebar-project-menu.tsx | 2 + apps/start/src/routeTree.gen.ts | 22 + .../_app.$organizationId.$projectId.logs.tsx | 385 ++++++++++++++++++ apps/start/src/utils/title.ts | 2 + apps/worker/package.json | 1 + apps/worker/src/boot-cron.ts | 5 + apps/worker/src/boot-workers.ts | 19 + apps/worker/src/jobs/cron.ts | 4 + apps/worker/src/jobs/logs.incoming-log.ts | 63 +++ packages/db/code-migrations/13-add-logs.ts | 72 ++++ packages/db/src/buffers/index.ts | 3 + packages/db/src/buffers/log-buffer.ts | 269 ++++++++++++ packages/queue/src/queues.ts | 49 ++- packages/sdks/sdk/src/index.ts | 84 ++++ packages/trpc/index.ts | 1 + packages/trpc/src/root.ts | 2 + packages/trpc/src/routers/log.ts | 212 ++++++++++ packages/validation/src/index.ts | 1 + packages/validation/src/log.validation.ts | 60 +++ pnpm-lock.yaml | 3 + 23 files changed, 1334 insertions(+), 1 deletion(-) create mode 100644 apps/api/src/controllers/logs.controller.ts create mode 100644 apps/api/src/routes/logs.router.ts create mode 100644 apps/start/src/routes/_app.$organizationId.$projectId.logs.tsx create mode 100644 apps/worker/src/jobs/logs.incoming-log.ts create mode 100644 packages/db/code-migrations/13-add-logs.ts create mode 100644 packages/db/src/buffers/log-buffer.ts create mode 100644 packages/trpc/src/routers/log.ts create mode 100644 packages/validation/src/log.validation.ts diff --git a/apps/api/src/controllers/logs.controller.ts b/apps/api/src/controllers/logs.controller.ts new file mode 100644 index 00000000..95ea15ff --- /dev/null +++ b/apps/api/src/controllers/logs.controller.ts @@ -0,0 +1,68 @@ +import { parseUserAgent } from '@openpanel/common/server'; +import { getSalts } from '@openpanel/db'; +import { getGeoLocation } from '@openpanel/geo'; +import { type LogsQueuePayload, logsQueue } from '@openpanel/queue'; +import { type ILogBatchPayload, zLogBatchPayload } from '@openpanel/validation'; +import type { FastifyReply, FastifyRequest } from 'fastify'; +import { getDeviceId } from '@/utils/ids'; +import { getStringHeaders } from './track.controller'; + +export async function handler( + request: FastifyRequest<{ Body: ILogBatchPayload }>, + reply: FastifyReply, +) { + const projectId = request.client?.projectId; + if (!projectId) { + return reply.status(400).send({ status: 400, error: 'Missing projectId' }); + } + + const validationResult = zLogBatchPayload.safeParse(request.body); + if (!validationResult.success) { + return reply.status(400).send({ + status: 400, + error: 'Bad Request', + message: 'Validation failed', + errors: validationResult.error.errors, + }); + } + + const { logs } = validationResult.data; + + const ip = request.clientIp; + const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + const headers = getStringHeaders(request.headers); + const receivedAt = new Date().toISOString(); + + const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); + const { deviceId, sessionId } = await getDeviceId({ projectId, ip, ua, salts }); + const uaInfo = parseUserAgent(ua, undefined); + + const jobs: LogsQueuePayload[] = logs.map((log) => ({ + type: 'incomingLog' as const, + payload: { + projectId, + log: { + ...log, + timestamp: log.timestamp ?? receivedAt, + }, + uaInfo, + geo: { + country: geo.country, + city: geo.city, + region: geo.region, + }, + headers, + deviceId, + sessionId, + }, + })); + + await logsQueue.addBulk( + jobs.map((job) => ({ + name: 'incomingLog', + data: job, + })), + ); + + return reply.status(200).send({ ok: true, count: logs.length }); +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cb5cdece..e905c59e 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -40,6 +40,7 @@ import gscCallbackRouter from './routes/gsc-callback.router'; import importRouter from './routes/import.router'; import insightsRouter from './routes/insights.router'; import liveRouter from './routes/live.router'; +import logsRouter from './routes/logs.router'; import manageRouter from './routes/manage.router'; import miscRouter from './routes/misc.router'; import oauthRouter from './routes/oauth-callback.router'; @@ -198,6 +199,7 @@ const startServer = async () => { instance.register(gscCallbackRouter, { prefix: '/gsc' }); instance.register(miscRouter, { prefix: '/misc' }); instance.register(aiRouter, { prefix: '/ai' }); + instance.register(logsRouter, { prefix: '/logs' }); }); // Public API diff --git a/apps/api/src/routes/logs.router.ts b/apps/api/src/routes/logs.router.ts new file mode 100644 index 00000000..c33310b5 --- /dev/null +++ b/apps/api/src/routes/logs.router.ts @@ -0,0 +1,6 @@ +import { handler } from '@/controllers/logs.controller'; +import type { FastifyInstance } from 'fastify'; + +export default async function (fastify: FastifyInstance) { + fastify.post('/', handler); +} diff --git a/apps/start/src/components/sidebar-project-menu.tsx b/apps/start/src/components/sidebar-project-menu.tsx index 1f381605..d4df5a51 100644 --- a/apps/start/src/components/sidebar-project-menu.tsx +++ b/apps/start/src/components/sidebar-project-menu.tsx @@ -15,6 +15,7 @@ import { LayoutDashboardIcon, LayoutPanelTopIcon, PlusIcon, + ScrollTextIcon, SearchIcon, SparklesIcon, TrendingUpDownIcon, @@ -61,6 +62,7 @@ export default function SidebarProjectMenu({ + diff --git a/apps/start/src/routeTree.gen.ts b/apps/start/src/routeTree.gen.ts index 3b22c2a1..4709651f 100644 --- a/apps/start/src/routeTree.gen.ts +++ b/apps/start/src/routeTree.gen.ts @@ -47,6 +47,7 @@ import { Route as AppOrganizationIdProjectIdReportsRouteImport } from './routes/ import { Route as AppOrganizationIdProjectIdReferencesRouteImport } from './routes/_app.$organizationId.$projectId.references' import { Route as AppOrganizationIdProjectIdRealtimeRouteImport } from './routes/_app.$organizationId.$projectId.realtime' import { Route as AppOrganizationIdProjectIdPagesRouteImport } from './routes/_app.$organizationId.$projectId.pages' +import { Route as AppOrganizationIdProjectIdLogsRouteImport } from './routes/_app.$organizationId.$projectId.logs' import { Route as AppOrganizationIdProjectIdInsightsRouteImport } from './routes/_app.$organizationId.$projectId.insights' import { Route as AppOrganizationIdProjectIdGroupsRouteImport } from './routes/_app.$organizationId.$projectId.groups' import { Route as AppOrganizationIdProjectIdDashboardsRouteImport } from './routes/_app.$organizationId.$projectId.dashboards' @@ -352,6 +353,12 @@ const AppOrganizationIdProjectIdPagesRoute = path: '/pages', getParentRoute: () => AppOrganizationIdProjectIdRoute, } as any) +const AppOrganizationIdProjectIdLogsRoute = + AppOrganizationIdProjectIdLogsRouteImport.update({ + id: '/logs', + path: '/logs', + getParentRoute: () => AppOrganizationIdProjectIdRoute, + } as any) const AppOrganizationIdProjectIdInsightsRoute = AppOrganizationIdProjectIdInsightsRouteImport.update({ id: '/insights', @@ -660,6 +667,7 @@ export interface FileRoutesByFullPath { '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute '/$organizationId/$projectId/groups': typeof AppOrganizationIdProjectIdGroupsRoute '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute + '/$organizationId/$projectId/logs': typeof AppOrganizationIdProjectIdLogsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -738,6 +746,7 @@ export interface FileRoutesByTo { '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute '/$organizationId/$projectId/groups': typeof AppOrganizationIdProjectIdGroupsRoute '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute + '/$organizationId/$projectId/logs': typeof AppOrganizationIdProjectIdLogsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -814,6 +823,7 @@ export interface FileRoutesById { '/_app/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute '/_app/$organizationId/$projectId/groups': typeof AppOrganizationIdProjectIdGroupsRoute '/_app/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute + '/_app/$organizationId/$projectId/logs': typeof AppOrganizationIdProjectIdLogsRoute '/_app/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/_app/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/_app/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -905,6 +915,7 @@ export interface FileRouteTypes { | '/$organizationId/$projectId/dashboards' | '/$organizationId/$projectId/groups' | '/$organizationId/$projectId/insights' + | '/$organizationId/$projectId/logs' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -983,6 +994,7 @@ export interface FileRouteTypes { | '/$organizationId/$projectId/dashboards' | '/$organizationId/$projectId/groups' | '/$organizationId/$projectId/insights' + | '/$organizationId/$projectId/logs' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -1058,6 +1070,7 @@ export interface FileRouteTypes { | '/_app/$organizationId/$projectId/dashboards' | '/_app/$organizationId/$projectId/groups' | '/_app/$organizationId/$projectId/insights' + | '/_app/$organizationId/$projectId/logs' | '/_app/$organizationId/$projectId/pages' | '/_app/$organizationId/$projectId/realtime' | '/_app/$organizationId/$projectId/references' @@ -1444,6 +1457,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof AppOrganizationIdProjectIdPagesRouteImport parentRoute: typeof AppOrganizationIdProjectIdRoute } + '/_app/$organizationId/$projectId/logs': { + id: '/_app/$organizationId/$projectId/logs' + path: '/logs' + fullPath: '/$organizationId/$projectId/logs' + preLoaderRoute: typeof AppOrganizationIdProjectIdLogsRouteImport + parentRoute: typeof AppOrganizationIdProjectIdRoute + } '/_app/$organizationId/$projectId/insights': { id: '/_app/$organizationId/$projectId/insights' path: '/insights' @@ -2028,6 +2048,7 @@ interface AppOrganizationIdProjectIdRouteChildren { AppOrganizationIdProjectIdDashboardsRoute: typeof AppOrganizationIdProjectIdDashboardsRoute AppOrganizationIdProjectIdGroupsRoute: typeof AppOrganizationIdProjectIdGroupsRoute AppOrganizationIdProjectIdInsightsRoute: typeof AppOrganizationIdProjectIdInsightsRoute + AppOrganizationIdProjectIdLogsRoute: typeof AppOrganizationIdProjectIdLogsRoute AppOrganizationIdProjectIdPagesRoute: typeof AppOrganizationIdProjectIdPagesRoute AppOrganizationIdProjectIdRealtimeRoute: typeof AppOrganizationIdProjectIdRealtimeRoute AppOrganizationIdProjectIdReferencesRoute: typeof AppOrganizationIdProjectIdReferencesRoute @@ -2054,6 +2075,7 @@ const AppOrganizationIdProjectIdRouteChildren: AppOrganizationIdProjectIdRouteCh AppOrganizationIdProjectIdGroupsRoute, AppOrganizationIdProjectIdInsightsRoute: AppOrganizationIdProjectIdInsightsRoute, + AppOrganizationIdProjectIdLogsRoute: AppOrganizationIdProjectIdLogsRoute, AppOrganizationIdProjectIdPagesRoute: AppOrganizationIdProjectIdPagesRoute, AppOrganizationIdProjectIdRealtimeRoute: AppOrganizationIdProjectIdRealtimeRoute, diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.logs.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.logs.tsx new file mode 100644 index 00000000..6f3976dd --- /dev/null +++ b/apps/start/src/routes/_app.$organizationId.$projectId.logs.tsx @@ -0,0 +1,385 @@ +import { useInfiniteQuery, useQuery } from '@tanstack/react-query'; +import { createFileRoute } from '@tanstack/react-router'; +import { format } from 'date-fns'; +import { AnimatePresence, motion } from 'framer-motion'; +import { + AlertCircleIcon, + AlertTriangleIcon, + BugIcon, + ChevronDownIcon, + ChevronRightIcon, + InfoIcon, + ScrollTextIcon, + SearchIcon, + SkullIcon, + TerminalIcon, + XIcon, +} from 'lucide-react'; +import { useMemo, useState } from 'react'; +import { PageContainer } from '@/components/page-container'; +import { PageHeader } from '@/components/page-header'; +import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { useSearchQueryState } from '@/hooks/use-search-query-state'; +import { useTRPC } from '@/integrations/trpc/react'; +import { createProjectTitle, PAGE_TITLES } from '@/utils/title'; +import type { ISeverityText } from '@openpanel/validation'; + +export const Route = createFileRoute( + '/_app/$organizationId/$projectId/logs' +)({ + component: Component, + head: () => { + return { + meta: [ + { + title: createProjectTitle(PAGE_TITLES.LOGS), + }, + ], + }; + }, +}); + +const SEVERITY_ICONS: Record = { + trace: TerminalIcon, + debug: BugIcon, + info: InfoIcon, + warn: AlertTriangleIcon, + warning: AlertTriangleIcon, + error: AlertCircleIcon, + fatal: SkullIcon, + critical: SkullIcon, +}; + +const SEVERITY_COLORS: Record = { + trace: 'text-gray-400', + debug: 'text-blue-400', + info: 'text-green-400', + warn: 'text-yellow-400', + warning: 'text-yellow-400', + error: 'text-red-400', + fatal: 'text-red-600', + critical: 'text-red-600', +}; + +const SEVERITY_BG_COLORS: Record = { + trace: 'bg-gray-500/10 hover:bg-gray-500/20', + debug: 'bg-blue-500/10 hover:bg-blue-500/20', + info: 'bg-green-500/10 hover:bg-green-500/20', + warn: 'bg-yellow-500/10 hover:bg-yellow-500/20', + warning: 'bg-yellow-500/10 hover:bg-yellow-500/20', + error: 'bg-red-500/10 hover:bg-red-500/20', + fatal: 'bg-red-600/10 hover:bg-red-600/20', + critical: 'bg-red-600/10 hover:bg-red-600/20', +}; + +function Component() { + const { projectId } = Route.useParams(); + const trpc = useTRPC(); + const { search, setSearch, debouncedSearch } = useSearchQueryState(); + const [selectedSeverity, setSelectedSeverity] = useState([]); + const [expandedLog, setExpandedLog] = useState(null); + + const severityCountsQuery = useQuery( + trpc.log.severityCounts.queryOptions({ projectId }, { enabled: !!projectId }) + ); + + const logsQuery = useInfiniteQuery( + trpc.log.list.infiniteQueryOptions( + { + projectId, + take: 50, + search: debouncedSearch || undefined, + severity: selectedSeverity.length > 0 ? selectedSeverity : undefined, + }, + { + getNextPageParam: (lastPage) => lastPage.meta.next, + } + ) + ); + + const logs = useMemo(() => { + return logsQuery.data?.pages.flatMap((page) => page.data) ?? []; + }, [logsQuery.data]); + + const severityCounts = severityCountsQuery.data ?? {}; + const severityOptions: ISeverityText[] = ['trace', 'debug', 'info', 'warn', 'error', 'fatal']; + + const toggleSeverity = (severity: ISeverityText) => { + setSelectedSeverity((prev) => + prev.includes(severity) + ? prev.filter((s) => s !== severity) + : [...prev, severity] + ); + }; + + return ( + + + + {/* Severity Filter Chips */} +
+ {severityOptions.map((severity) => { + const count = severityCounts[severity] ?? 0; + const isSelected = selectedSeverity.includes(severity); + const Icon = SEVERITY_ICONS[severity]; + + return ( + + ); + })} + + {selectedSeverity.length > 0 && ( + + )} +
+ + {/* Search */} +
+ + setSearch(e.target.value)} + placeholder="Search logs..." + value={search} + /> +
+ + {/* Logs List */} +
+ {logs.map((log) => { + const isExpanded = expandedLog === log.id; + const Icon = SEVERITY_ICONS[log.severityText as ISeverityText] ?? InfoIcon; + const severityColor = SEVERITY_COLORS[log.severityText as ISeverityText] ?? 'text-gray-400'; + const bgColor = SEVERITY_BG_COLORS[log.severityText as ISeverityText] ?? 'bg-gray-500/10'; + + return ( + + + + + {isExpanded && ( + +
+ {/* Full Message */} +
+

Message

+
+                          {log.body}
+                        
+
+ + {/* Attributes */} + {Object.keys(log.attributes).length > 0 && ( +
+

Attributes

+
+ {Object.entries(log.attributes).map(([key, value]) => ( +
+ + {key} + + {value} +
+ ))} +
+
+ )} + + {/* Resource */} + {Object.keys(log.resource).length > 0 && ( +
+

Resource

+
+ {Object.entries(log.resource).map(([key, value]) => ( +
+ + {key} + + {value} +
+ ))} +
+
+ )} + + {/* Trace Context */} + {(log.traceId || log.spanId) && ( +
+

Trace Context

+
+ {log.traceId && ( +
+ + Trace ID + + {log.traceId} +
+ )} + {log.spanId && ( +
+ + Span ID + + {log.spanId} +
+ )} +
+
+ )} + + {/* Device Info */} +
+

Device

+
+
+ Device ID + {log.deviceId} +
+ {log.profileId && ( +
+ Profile ID + {log.profileId} +
+ )} +
+ OS + + {log.os} {log.osVersion} + +
+
+ Browser + + {log.browser} {log.browserVersion} + +
+
+ Location + + {log.city}, {log.region}, {log.country} + +
+
+
+ + {/* SDK Info */} +
+

SDK

+
+
+ Name + {log.sdkName || 'unknown'} +
+
+ Version + {log.sdkVersion || 'unknown'} +
+
+
+ + {/* Observed At */} +
+ Observed at: {format(new Date(log.observedAt), 'MMM d, HH:mm:ss.SSS')} +
+
+
+ )} +
+
+ ); + })} +
+ + {/* Load More */} + {logsQuery.hasNextPage && ( +
+ +
+ )} +
+ ); +} diff --git a/apps/start/src/utils/title.ts b/apps/start/src/utils/title.ts index 1d310735..3acf20e4 100644 --- a/apps/start/src/utils/title.ts +++ b/apps/start/src/utils/title.ts @@ -97,6 +97,8 @@ export const PAGE_TITLES = { PROFILE_DETAILS: 'Profile details', // Groups GROUPS: 'Groups', + // Logs + LOGS: 'Logs', GROUP_DETAILS: 'Group details', // Sub-sections diff --git a/apps/worker/package.json b/apps/worker/package.json index 2d2b2b6c..3f0435ab 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -24,6 +24,7 @@ "@openpanel/payments": "workspace:*", "@openpanel/queue": "workspace:*", "@openpanel/redis": "workspace:*", + "@openpanel/validation": "workspace:*", "bullmq": "^5.63.0", "date-fns": "^3.3.1", "express": "^4.18.2", diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 7ad6d34a..537742d4 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -72,6 +72,11 @@ export async function bootCron() { type: 'flushGroups', pattern: 1000 * 10, }, + { + name: 'flush', + type: 'flushLogs', + pattern: 1000 * 10, + }, { name: 'insightsDaily', type: 'insightsDaily', diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index f6395565..8cdf2610 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -8,6 +8,7 @@ import { gscQueue, importQueue, insightsQueue, + logsQueue, miscQueue, notificationQueue, queueLogger, @@ -22,6 +23,7 @@ import { incomingEvent } from './jobs/events.incoming-event'; import { gscJob } from './jobs/gsc'; import { importJob } from './jobs/import'; import { insightsProjectJob } from './jobs/insights'; +import { incomingLog } from './jobs/logs.incoming-log'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; @@ -59,6 +61,7 @@ function getEnabledQueues(): QueueName[] { 'import', 'insights', 'gsc', + 'logs', ]; } @@ -221,6 +224,22 @@ export function bootWorkers() { logger.info('Started worker for gsc', { concurrency }); } + // Start logs worker + if (enabledQueues.includes('logs')) { + const concurrency = getConcurrencyFor('logs', 10); + const logsWorker = new Worker(logsQueue.name, async (job) => { + const { type, payload } = job.data; + if (type === 'incomingLog') { + return await incomingLog(payload); + } + }, { + ...workerOptions, + concurrency, + }); + workers.push(logsWorker); + logger.info('Started worker for logs', { concurrency }); + } + if (workers.length === 0) { logger.warn( 'No workers started. Check ENABLED_QUEUES environment variable.' diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index 4d500c62..93d7bc77 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -1,6 +1,7 @@ import { eventBuffer, groupBuffer, + logBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, @@ -38,6 +39,9 @@ export async function cronJob(job: Job) { case 'flushGroups': { return await groupBuffer.tryFlush(); } + case 'flushLogs': { + return await logBuffer.tryFlush(); + } case 'ping': { return await ping(); } diff --git a/apps/worker/src/jobs/logs.incoming-log.ts b/apps/worker/src/jobs/logs.incoming-log.ts new file mode 100644 index 00000000..83b9624e --- /dev/null +++ b/apps/worker/src/jobs/logs.incoming-log.ts @@ -0,0 +1,63 @@ +import type { IClickhouseLog } from '@openpanel/db'; +import { logBuffer } from '@openpanel/db'; +import type { LogsQueuePayload } from '@openpanel/queue'; +import { SEVERITY_TEXT_TO_NUMBER } from '@openpanel/validation'; +import { logger as baseLogger } from '@/utils/logger'; + +export async function incomingLog( + payload: LogsQueuePayload['payload'], +): Promise { + const logger = baseLogger.child({ projectId: payload.projectId }); + + try { + const { log, uaInfo, geo, deviceId, sessionId, projectId, headers } = payload; + + const sdkName = headers['openpanel-sdk-name'] ?? ''; + const sdkVersion = headers['openpanel-sdk-version'] ?? ''; + + const severityNumber = + log.severityNumber ?? + SEVERITY_TEXT_TO_NUMBER[log.severity] ?? + 9; // INFO fallback + + const row: IClickhouseLog = { + project_id: projectId, + device_id: deviceId, + profile_id: log.profileId ? String(log.profileId) : '', + session_id: sessionId, + timestamp: log.timestamp, + observed_at: new Date().toISOString(), + severity_number: severityNumber, + severity_text: log.severity, + body: log.body, + trace_id: log.traceId ?? '', + span_id: log.spanId ?? '', + trace_flags: log.traceFlags ?? 0, + logger_name: log.loggerName ?? '', + attributes: log.attributes ?? {}, + resource: log.resource ?? {}, + sdk_name: sdkName, + sdk_version: sdkVersion, + country: geo.country ?? '', + city: geo.city ?? '', + region: geo.region ?? '', + os: uaInfo.os ?? '', + os_version: uaInfo.osVersion ?? '', + browser: uaInfo.isServer ? '' : (uaInfo.browser ?? ''), + browser_version: uaInfo.isServer ? '' : (uaInfo.browserVersion ?? ''), + device: uaInfo.device ?? '', + brand: uaInfo.isServer ? '' : (uaInfo.brand ?? ''), + model: uaInfo.isServer ? '' : (uaInfo.model ?? ''), + }; + + logBuffer.add(row); + + logger.info('Log queued', { + severity: log.severity, + loggerName: log.loggerName, + }); + } catch (error) { + logger.error('Failed to process incoming log', { error }); + throw error; + } +} diff --git a/packages/db/code-migrations/13-add-logs.ts b/packages/db/code-migrations/13-add-logs.ts new file mode 100644 index 00000000..34af6829 --- /dev/null +++ b/packages/db/code-migrations/13-add-logs.ts @@ -0,0 +1,72 @@ +import { createTable, runClickhouseMigrationCommands } from '../src/clickhouse/migration'; +import { getIsCluster, printBoxMessage } from './helpers'; + +export async function up() { + const replicatedVersion = '1'; + const isClustered = getIsCluster(); + + const sqls: string[] = []; + + sqls.push( + ...createTable({ + name: 'logs', + columns: [ + '`id` UUID DEFAULT generateUUIDv4()', + '`project_id` String CODEC(ZSTD(3))', + '`device_id` String CODEC(ZSTD(3))', + '`profile_id` String CODEC(ZSTD(3))', + '`session_id` String CODEC(LZ4)', + // OpenTelemetry log fields + '`timestamp` DateTime64(9) CODEC(DoubleDelta, ZSTD(3))', + '`observed_at` DateTime64(9) CODEC(DoubleDelta, ZSTD(3))', + '`severity_number` UInt8', + '`severity_text` LowCardinality(String)', + '`body` String CODEC(ZSTD(3))', + '`trace_id` String CODEC(ZSTD(3))', + '`span_id` String CODEC(ZSTD(3))', + '`trace_flags` UInt32 DEFAULT 0', + '`logger_name` LowCardinality(String)', + // OTel attributes (log-level key-value pairs) + '`attributes` Map(String, String) CODEC(ZSTD(3))', + // OTel resource attributes (device/app metadata) + '`resource` Map(String, String) CODEC(ZSTD(3))', + // Server-enriched context + '`sdk_name` LowCardinality(String)', + '`sdk_version` LowCardinality(String)', + '`country` LowCardinality(FixedString(2))', + '`city` String', + '`region` LowCardinality(String)', + '`os` LowCardinality(String)', + '`os_version` LowCardinality(String)', + '`browser` LowCardinality(String)', + '`browser_version` LowCardinality(String)', + '`device` LowCardinality(String)', + '`brand` LowCardinality(String)', + '`model` LowCardinality(String)', + ], + indices: [ + 'INDEX idx_severity_number severity_number TYPE minmax GRANULARITY 1', + 'INDEX idx_body body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1', + 'INDEX idx_trace_id trace_id TYPE bloom_filter GRANULARITY 1', + 'INDEX idx_logger_name logger_name TYPE bloom_filter GRANULARITY 1', + ], + orderBy: ['project_id', 'toDate(timestamp)', 'severity_number', 'device_id'], + partitionBy: 'toYYYYMM(timestamp)', + settings: { + index_granularity: 8192, + ttl_only_drop_parts: 1, + }, + distributionHash: 'cityHash64(project_id, toString(toStartOfHour(timestamp)))', + replicatedVersion, + isClustered, + }), + ); + + printBoxMessage('Running migration: 13-add-logs', [ + 'Creates the logs table for OpenTelemetry-compatible device/app log capture.', + ]); + + if (!process.argv.includes('--dry')) { + await runClickhouseMigrationCommands(sqls); + } +} diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 86741b54..310ad3a5 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,6 +1,7 @@ import { BotBuffer as BotBufferRedis } from './bot-buffer'; import { EventBuffer as EventBufferRedis } from './event-buffer'; import { GroupBuffer } from './group-buffer'; +import { LogBuffer } from './log-buffer'; import { ProfileBackfillBuffer } from './profile-backfill-buffer'; import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; import { ReplayBuffer } from './replay-buffer'; @@ -13,6 +14,8 @@ export const sessionBuffer = new SessionBuffer(); export const profileBackfillBuffer = new ProfileBackfillBuffer(); export const replayBuffer = new ReplayBuffer(); export const groupBuffer = new GroupBuffer(); +export const logBuffer = new LogBuffer(); export type { ProfileBackfillEntry } from './profile-backfill-buffer'; export type { IClickhouseSessionReplayChunk } from './replay-buffer'; +export type { IClickhouseLog } from './log-buffer'; diff --git a/packages/db/src/buffers/log-buffer.ts b/packages/db/src/buffers/log-buffer.ts new file mode 100644 index 00000000..0bec4ab5 --- /dev/null +++ b/packages/db/src/buffers/log-buffer.ts @@ -0,0 +1,269 @@ +import { getSafeJson } from '@openpanel/json'; +import { getRedisCache } from '@openpanel/redis'; +import { ch } from '../clickhouse/client'; +import { BaseBuffer } from './base-buffer'; + +export interface IClickhouseLog { + id?: string; + project_id: string; + device_id: string; + profile_id: string; + session_id: string; + timestamp: string; + observed_at: string; + severity_number: number; + severity_text: string; + body: string; + trace_id: string; + span_id: string; + trace_flags: number; + logger_name: string; + attributes: Record; + resource: Record; + sdk_name: string; + sdk_version: string; + country: string; + city: string; + region: string; + os: string; + os_version: string; + browser: string; + browser_version: string; + device: string; + brand: string; + model: string; +} + +export class LogBuffer extends BaseBuffer { + private batchSize = process.env.LOG_BUFFER_BATCH_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_BATCH_SIZE, 10) + : 4000; + private chunkSize = process.env.LOG_BUFFER_CHUNK_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_CHUNK_SIZE, 10) + : 1000; + private microBatchIntervalMs = process.env.LOG_BUFFER_MICRO_BATCH_MS + ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_MS, 10) + : 10; + private microBatchMaxSize = process.env.LOG_BUFFER_MICRO_BATCH_SIZE + ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_SIZE, 10) + : 100; + + private pendingLogs: IClickhouseLog[] = []; + private flushTimer: ReturnType | null = null; + private isFlushing = false; + private flushRetryCount = 0; + + private queueKey = 'log_buffer:queue'; + protected bufferCounterKey = 'log_buffer:total_count'; + + constructor() { + super({ + name: 'log', + onFlush: async () => { + await this.processBuffer(); + }, + }); + } + + add(log: IClickhouseLog) { + this.pendingLogs.push(log); + + if (this.pendingLogs.length >= this.microBatchMaxSize) { + this.flushLocalBuffer(); + return; + } + + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + + public async flush() { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLocalBuffer(); + } + + private async flushLocalBuffer() { + if (this.isFlushing || this.pendingLogs.length === 0) { + return; + } + + this.isFlushing = true; + const logsToFlush = this.pendingLogs; + this.pendingLogs = []; + + try { + // Push to Redis queue for processing + const pipeline = getRedisCache().pipeline(); + for (const log of logsToFlush) { + pipeline.lpush(this.queueKey, JSON.stringify(log)); + } + await pipeline.exec(); + + // Increment counter + await getRedisCache().incrby(this.bufferCounterKey, logsToFlush.length); + + this.flushRetryCount = 0; + } catch (error) { + this.logger.error('Failed to push logs to Redis queue', { error }); + // Re-queue locally on failure + this.pendingLogs = logsToFlush.concat(this.pendingLogs); + this.flushRetryCount++; + + // If max retries exceeded, log and drop + if (this.flushRetryCount >= 3) { + this.logger.error('Max retries exceeded, dropping logs', { + droppedCount: this.pendingLogs.length, + }); + this.pendingLogs = []; + this.flushRetryCount = 0; + } + } finally { + this.isFlushing = false; + } + } + + private async processBuffer() { + const startTime = Date.now(); + const redis = getRedisCache(); + + try { + // Get batch of logs from Redis + const batch: string[] = []; + const pipeline = redis.pipeline(); + + for (let i = 0; i < this.batchSize; i++) { + pipeline.rpop(this.queueKey); + } + + const results = await pipeline.exec(); + if (!results) { + return; + } + + for (const result of results) { + if (result[1]) { + batch.push(result[1] as string); + } + } + + if (batch.length === 0) { + return; + } + + this.logger.info(`Processing ${batch.length} logs`); + + // Parse logs + const logs: IClickhouseLog[] = []; + for (const item of batch) { + try { + const parsed = getSafeJson(item); + if (parsed) { + logs.push(parsed); + } + } catch (error) { + this.logger.error('Failed to parse log', { error, item }); + } + } + + if (logs.length === 0) { + return; + } + + // Insert into ClickHouse in chunks + const chunks = this.chunks(logs, this.chunkSize); + for (const chunk of chunks) { + await this.insertChunk(chunk); + } + + // Decrement counter + await redis.decrby(this.bufferCounterKey, batch.length); + + this.logger.info('Logs processed successfully', { + count: logs.length, + elapsed: Date.now() - startTime, + }); + } catch (error) { + this.logger.error('Failed to process logs', { error }); + throw error; + } + } + + private async insertChunk(logs: IClickhouseLog[]) { + const query = ` + INSERT INTO logs ( + id, project_id, device_id, profile_id, session_id, + timestamp, observed_at, severity_number, severity_text, body, + trace_id, span_id, trace_flags, logger_name, attributes, resource, + sdk_name, sdk_version, country, city, region, + os, os_version, browser, browser_version, device, brand, model + ) + VALUES + `; + + const values = logs + .map((log) => { + return `( + generateUUIDv4(), + ${escape(log.project_id)}, + ${escape(log.device_id)}, + ${escape(log.profile_id)}, + ${escape(log.session_id)}, + ${escape(log.timestamp)}, + ${escape(log.observed_at)}, + ${log.severity_number}, + ${escape(log.severity_text)}, + ${escape(log.body)}, + ${escape(log.trace_id)}, + ${escape(log.span_id)}, + ${log.trace_flags}, + ${escape(log.logger_name)}, + ${mapToSql(log.attributes)}, + ${mapToSql(log.resource)}, + ${escape(log.sdk_name)}, + ${escape(log.sdk_version)}, + ${escape(log.country)}, + ${escape(log.city)}, + ${escape(log.region)}, + ${escape(log.os)}, + ${escape(log.os_version)}, + ${escape(log.browser)}, + ${escape(log.browser_version)}, + ${escape(log.device)}, + ${escape(log.brand)}, + ${escape(log.model)} + )`; + }) + .join(','); + + await ch.query({ + query: `${query} ${values}`, + clickhouse_settings: { + wait_end_of_query: 1, + }, + }); + } +} + +function escape(value: string): string { + if (value === null || value === undefined) { + return "''"; + } + return `'${value.replace(/'/g, "\\'").replace(/\\/g, '\\\\')}'`; +} + +function mapToSql(map: Record): string { + if (!map || Object.keys(map).length === 0) { + return '{}'; + } + const entries = Object.entries(map) + .map(([k, v]) => `${escape(k)}: ${escape(v)}`) + .join(', '); + return `{${entries}}`; +} diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 171e4707..898f1259 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -8,7 +8,7 @@ import { createLogger } from '@openpanel/logger'; import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; import { Queue } from 'bullmq'; import { Queue as GroupQueue } from 'groupmq'; -import type { ITrackPayload } from '../../validation'; +import type { ILogPayload, ITrackPayload } from '../../validation'; export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt( process.env.EVENTS_GROUP_QUEUES_SHARDS || '1', @@ -297,3 +297,50 @@ export const gscQueue = new Queue(getQueueName('gsc'), { removeOnFail: 100, }, }); + +export type LogsQueuePayload = { + type: 'incomingLog'; + payload: { + projectId: string; + log: ILogPayload & { + timestamp: string; + }; + uaInfo: + | { + readonly isServer: true; + readonly device: 'server'; + readonly os: ''; + readonly osVersion: ''; + readonly browser: ''; + readonly browserVersion: ''; + readonly brand: ''; + readonly model: ''; + } + | { + readonly os: string | undefined; + readonly osVersion: string | undefined; + readonly browser: string | undefined; + readonly browserVersion: string | undefined; + readonly device: string; + readonly brand: string | undefined; + readonly model: string | undefined; + readonly isServer: false; + }; + geo: { + country: string | undefined; + city: string | undefined; + region: string | undefined; + }; + headers: Record; + deviceId: string; + sessionId: string; + }; +}; + +export const logsQueue = new Queue(getQueueName('logs'), { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 100, + removeOnFail: 1000, + }, +}); diff --git a/packages/sdks/sdk/src/index.ts b/packages/sdks/sdk/src/index.ts index 71789d62..8f4c41ef 100644 --- a/packages/sdks/sdk/src/index.ts +++ b/packages/sdks/sdk/src/index.ts @@ -7,6 +7,8 @@ import type { IGroupPayload as GroupPayload, IIdentifyPayload as IdentifyPayload, IIncrementPayload as IncrementPayload, + ILogPayload, + ISeverityText, ITrackHandlerPayload as TrackHandlerPayload, ITrackPayload as TrackPayload, } from '@openpanel/validation'; @@ -23,6 +25,8 @@ export type { TrackPayload, }; +export type LogProperties = Omit; + export interface TrackProperties { [key: string]: unknown; profileId?: string; @@ -48,6 +52,19 @@ export interface OpenPanelOptions { debug?: boolean; } +interface LogPayloadForQueue { + body: string; + severity: ISeverityText; + timestamp: string; + profileId?: string | number; + loggerName?: string; + traceId?: string; + spanId?: string; + traceFlags?: number; + attributes?: Record; + resource?: Record; +} + export class OpenPanel { api: Api; options: OpenPanelOptions; @@ -58,6 +75,12 @@ export class OpenPanel { global?: Record; queue: TrackHandlerPayload[] = []; + // Log queue for batching + private logQueue: LogPayloadForQueue[] = []; + private logFlushTimer: ReturnType | null = null; + private logFlushIntervalMs = 1000; + private logFlushMaxSize = 100; + constructor(options: OpenPanelOptions) { this.options = options; @@ -327,6 +350,67 @@ export class OpenPanel { this.queue = remaining; } + captureLog( + severity: ISeverityText, + body: string, + properties?: LogProperties, + ) { + if (this.options.disabled) { + return; + } + + const entry: LogPayloadForQueue = { + body, + severity, + timestamp: properties?.timestamp ?? new Date().toISOString(), + ...(this.profileId ? { profileId: this.profileId } : {}), + ...(properties?.loggerName ? { loggerName: properties.loggerName } : {}), + ...(properties?.traceId ? { traceId: properties.traceId } : {}), + ...(properties?.spanId ? { spanId: properties.spanId } : {}), + ...(properties?.traceFlags !== undefined + ? { traceFlags: properties.traceFlags } + : {}), + ...(properties?.attributes ? { attributes: properties.attributes } : {}), + ...(properties?.resource ? { resource: properties.resource } : {}), + }; + + this.logQueue.push(entry); + + if (this.logQueue.length >= this.logFlushMaxSize) { + this.flushLogs(); + return; + } + + if (!this.logFlushTimer) { + this.logFlushTimer = setTimeout(() => { + this.logFlushTimer = null; + this.flushLogs(); + }, this.logFlushIntervalMs); + } + } + + private async flushLogs() { + if (this.logFlushTimer) { + clearTimeout(this.logFlushTimer); + this.logFlushTimer = null; + } + + if (this.logQueue.length === 0) { + return; + } + + const batch = this.logQueue; + this.logQueue = []; + + try { + await this.api.fetch('/logs', { logs: batch }); + } catch (error) { + this.log('Failed to flush logs', error); + // Re-queue on failure + this.logQueue = batch.concat(this.logQueue); + } + } + log(...args: any[]) { if (this.options.debug) { console.log('[OpenPanel.dev]', ...args); diff --git a/packages/trpc/index.ts b/packages/trpc/index.ts index aa3f4ca1..8ff472c2 100644 --- a/packages/trpc/index.ts +++ b/packages/trpc/index.ts @@ -1,3 +1,4 @@ export { getProjectAccess } from './src/access'; export * from './src/root'; export * from './src/trpc'; +export type { IServiceLog } from './src/routers/log'; diff --git a/packages/trpc/src/root.ts b/packages/trpc/src/root.ts index 808de8c6..8f517cd8 100644 --- a/packages/trpc/src/root.ts +++ b/packages/trpc/src/root.ts @@ -10,6 +10,7 @@ import { gscRouter } from './routers/gsc'; import { importRouter } from './routers/import'; import { insightRouter } from './routers/insight'; import { integrationRouter } from './routers/integration'; +import { logRouter } from './routers/log'; import { notificationRouter } from './routers/notification'; import { onboardingRouter } from './routers/onboarding'; import { organizationRouter } from './routers/organization'; @@ -57,6 +58,7 @@ export const appRouter = createTRPCRouter({ email: emailRouter, gsc: gscRouter, group: groupRouter, + log: logRouter, }); // export type definition of API diff --git a/packages/trpc/src/routers/log.ts b/packages/trpc/src/routers/log.ts new file mode 100644 index 00000000..4664aeb0 --- /dev/null +++ b/packages/trpc/src/routers/log.ts @@ -0,0 +1,212 @@ +import { chQuery, convertClickhouseDateToJs } from '@openpanel/db'; +import { zSeverityText } from '@openpanel/validation'; +import sqlstring from 'sqlstring'; +import { z } from 'zod'; +import { createTRPCRouter, protectedProcedure } from '../trpc'; + +export interface IServiceLog { + id: string; + projectId: string; + deviceId: string; + profileId: string; + sessionId: string; + timestamp: Date; + severityNumber: number; + severityText: string; + body: string; + traceId: string; + spanId: string; + traceFlags: number; + loggerName: string; + attributes: Record; + resource: Record; + sdkName: string; + sdkVersion: string; + country: string; + city: string; + region: string; + os: string; + osVersion: string; + browser: string; + browserVersion: string; + device: string; + brand: string; + model: string; +} + +interface IClickhouseLog { + id: string; + project_id: string; + device_id: string; + profile_id: string; + session_id: string; + timestamp: string; + severity_number: number; + severity_text: string; + body: string; + trace_id: string; + span_id: string; + trace_flags: number; + logger_name: string; + attributes: Record; + resource: Record; + sdk_name: string; + sdk_version: string; + country: string; + city: string; + region: string; + os: string; + os_version: string; + browser: string; + browser_version: string; + device: string; + brand: string; + model: string; +} + +function toServiceLog(row: IClickhouseLog): IServiceLog { + return { + id: row.id, + projectId: row.project_id, + deviceId: row.device_id, + profileId: row.profile_id, + sessionId: row.session_id, + timestamp: convertClickhouseDateToJs(row.timestamp), + severityNumber: row.severity_number, + severityText: row.severity_text, + body: row.body, + traceId: row.trace_id, + spanId: row.span_id, + traceFlags: row.trace_flags, + loggerName: row.logger_name, + attributes: row.attributes, + resource: row.resource, + sdkName: row.sdk_name, + sdkVersion: row.sdk_version, + country: row.country, + city: row.city, + region: row.region, + os: row.os, + osVersion: row.os_version, + browser: row.browser, + browserVersion: row.browser_version, + device: row.device, + brand: row.brand, + model: row.model, + }; +} + +export const logRouter = createTRPCRouter({ + list: protectedProcedure + .input( + z.object({ + projectId: z.string(), + cursor: z.string().nullish(), + severity: z.array(zSeverityText).optional(), + search: z.string().optional(), + loggerName: z.string().optional(), + startDate: z.date().optional(), + endDate: z.date().optional(), + take: z.number().default(50), + }), + ) + .query(async ({ input }) => { + const { projectId, cursor, severity, search, loggerName, startDate, endDate, take } = input; + + const conditions: string[] = [ + `project_id = ${sqlstring.escape(projectId)}`, + ]; + + if (cursor) { + conditions.push(`timestamp < ${sqlstring.escape(cursor)}`); + } + + if (severity && severity.length > 0) { + const escaped = severity.map((s) => sqlstring.escape(s)).join(', '); + conditions.push(`severity_text IN (${escaped})`); + } + + if (search) { + conditions.push(`body ILIKE ${sqlstring.escape(`%${search}%`)}`); + } + + if (loggerName) { + conditions.push(`logger_name = ${sqlstring.escape(loggerName)}`); + } + + if (startDate) { + conditions.push(`timestamp >= ${sqlstring.escape(startDate.toISOString())}`); + } + + if (endDate) { + conditions.push(`timestamp <= ${sqlstring.escape(endDate.toISOString())}`); + } + + const where = conditions.join(' AND '); + + const rows = await chQuery( + `SELECT + id, project_id, device_id, profile_id, session_id, + timestamp, severity_number, severity_text, body, + trace_id, span_id, trace_flags, logger_name, + attributes, resource, + sdk_name, sdk_version, + country, city, region, os, os_version, + browser, browser_version, device, brand, model + FROM logs + WHERE ${where} + ORDER BY timestamp DESC + LIMIT ${take + 1}`, + ); + + const hasMore = rows.length > take; + const data = rows.slice(0, take).map(toServiceLog); + const lastItem = data[data.length - 1]; + + return { + data, + meta: { + next: hasMore && lastItem ? lastItem.timestamp.toISOString() : null, + }, + }; + }), + + severityCounts: protectedProcedure + .input( + z.object({ + projectId: z.string(), + startDate: z.date().optional(), + endDate: z.date().optional(), + }), + ) + .query(async ({ input }) => { + const { projectId, startDate, endDate } = input; + + const conditions: string[] = [ + `project_id = ${sqlstring.escape(projectId)}`, + ]; + + if (startDate) { + conditions.push(`timestamp >= ${sqlstring.escape(startDate.toISOString())}`); + } + + if (endDate) { + conditions.push(`timestamp <= ${sqlstring.escape(endDate.toISOString())}`); + } + + const where = conditions.join(' AND '); + + const rows = await chQuery<{ severity_text: string; count: number }>( + `SELECT severity_text, count() AS count + FROM logs + WHERE ${where} + GROUP BY severity_text + ORDER BY count DESC`, + ); + + return rows.reduce>((acc, row) => { + acc[row.severity_text] = row.count; + return acc; + }, {}); + }), +}); diff --git a/packages/validation/src/index.ts b/packages/validation/src/index.ts index e9b42174..92674ed6 100644 --- a/packages/validation/src/index.ts +++ b/packages/validation/src/index.ts @@ -625,3 +625,4 @@ export type ICreateImport = z.infer; export * from './event-blocklist'; export * from './track.validation'; export * from './types.insights'; +export * from './log.validation'; diff --git a/packages/validation/src/log.validation.ts b/packages/validation/src/log.validation.ts new file mode 100644 index 00000000..5f25732e --- /dev/null +++ b/packages/validation/src/log.validation.ts @@ -0,0 +1,60 @@ +import { z } from 'zod'; + +/** + * OTel severity number mapping (subset): + * TRACE=1, DEBUG=5, INFO=9, WARN=13, ERROR=17, FATAL=21 + */ +export const SEVERITY_TEXT_TO_NUMBER: Record = { + trace: 1, + debug: 5, + info: 9, + warn: 13, + warning: 13, + error: 17, + fatal: 21, + critical: 21, +}; + +export const zSeverityText = z.enum([ + 'trace', + 'debug', + 'info', + 'warn', + 'warning', + 'error', + 'fatal', + 'critical', +]); + +export type ISeverityText = z.infer; + +export const zLogPayload = z.object({ + /** Log message / body */ + body: z.string().min(1), + /** Severity level as text */ + severity: zSeverityText.default('info'), + /** Optional override for the numeric OTel severity (1-24) */ + severityNumber: z.number().int().min(1).max(24).optional(), + /** ISO 8601 timestamp; defaults to server receive time if omitted */ + timestamp: z.string().datetime({ offset: true }).optional(), + /** Logger name (e.g. "com.example.MyActivity") */ + loggerName: z.string().optional(), + /** W3C trace context */ + traceId: z.string().optional(), + spanId: z.string().optional(), + traceFlags: z.number().int().min(0).optional(), + /** Log-level key-value attributes */ + attributes: z.record(z.string(), z.string()).optional(), + /** Resource/device attributes (app version, runtime, etc.) */ + resource: z.record(z.string(), z.string()).optional(), + /** Profile/user ID to associate with this log */ + profileId: z.union([z.string().min(1), z.number()]).optional(), +}); + +export type ILogPayload = z.infer; + +export const zLogBatchPayload = z.object({ + logs: z.array(zLogPayload).min(1).max(500), +}); + +export type ILogBatchPayload = z.infer; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6aca8e62..e39cbe27 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -959,6 +959,9 @@ importers: '@openpanel/redis': specifier: workspace:* version: link:../../packages/redis + '@openpanel/validation': + specifier: workspace:* + version: link:../../packages/validation bullmq: specifier: ^5.63.0 version: 5.63.0