feat: add OpenTelemetry device log capture pipeline

- ClickHouse `logs` table (migration 13) with OTel columns, bloom filter indices
- Zod validation schema for log payloads (severity, body, attributes, trace context)
- Redis-backed LogBuffer with micro-batching into ClickHouse
- POST /logs API endpoint with client auth, geo + UA enrichment
- BullMQ logs queue + worker job
- cron flushLogs every 10s wired into existing cron system
- SDK captureLog(severity, body, properties) with client-side batching

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-30 12:04:04 +02:00
parent a1ce71ffb6
commit 0672857974
14 changed files with 652 additions and 2 deletions

View File

@@ -0,0 +1,68 @@
import { parseUserAgent } from '@openpanel/common/server';
import { getSalts } from '@openpanel/db';
import { getGeoLocation } from '@openpanel/geo';
import { type LogsQueuePayload, logsQueue } from '@openpanel/queue';
import { type ILogBatchPayload, zLogBatchPayload } from '@openpanel/validation';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { getDeviceId } from '@/utils/ids';
import { getStringHeaders } from './track.controller';
export async function handler(
request: FastifyRequest<{ Body: ILogBatchPayload }>,
reply: FastifyReply,
) {
const projectId = request.client?.projectId;
if (!projectId) {
return reply.status(400).send({ status: 400, error: 'Missing projectId' });
}
const validationResult = zLogBatchPayload.safeParse(request.body);
if (!validationResult.success) {
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Validation failed',
errors: validationResult.error.errors,
});
}
const { logs } = validationResult.data;
const ip = request.clientIp;
const ua = request.headers['user-agent'] ?? 'unknown/1.0';
const headers = getStringHeaders(request.headers);
const receivedAt = new Date().toISOString();
const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]);
const { deviceId, sessionId } = await getDeviceId({ projectId, ip, ua, salts });
const uaInfo = parseUserAgent(ua, undefined);
const jobs: LogsQueuePayload[] = logs.map((log) => ({
type: 'incomingLog' as const,
payload: {
projectId,
log: {
...log,
timestamp: log.timestamp ?? receivedAt,
},
uaInfo,
geo: {
country: geo.country,
city: geo.city,
region: geo.region,
},
headers,
deviceId,
sessionId,
},
}));
await logsQueue.addBulk(
jobs.map((job) => ({
name: 'incomingLog',
data: job,
})),
);
return reply.status(200).send({ ok: true, count: logs.length });
}

View File

@@ -44,6 +44,7 @@ import manageRouter from './routes/manage.router';
import miscRouter from './routes/misc.router';
import oauthRouter from './routes/oauth-callback.router';
import profileRouter from './routes/profile.router';
import logsRouter from './routes/logs.router';
import trackRouter from './routes/track.router';
import webhookRouter from './routes/webhook.router';
import { HttpError } from './utils/errors';
@@ -209,6 +210,7 @@ const startServer = async () => {
instance.register(importRouter, { prefix: '/import' });
instance.register(insightsRouter, { prefix: '/insights' });
instance.register(trackRouter, { prefix: '/track' });
instance.register(logsRouter, { prefix: '/logs' });
instance.register(manageRouter, { prefix: '/manage' });
// Keep existing endpoints for backward compatibility
instance.get('/healthcheck', healthcheck);

View File

@@ -0,0 +1,17 @@
import type { FastifyPluginCallback } from 'fastify';
import { handler } from '@/controllers/logs.controller';
import { clientHook } from '@/hooks/client.hook';
import { duplicateHook } from '@/hooks/duplicate.hook';
const logsRouter: FastifyPluginCallback = async (fastify) => {
fastify.addHook('preValidation', duplicateHook);
fastify.addHook('preHandler', clientHook);
fastify.route({
method: 'POST',
url: '/',
handler,
});
};
export default logsRouter;

View File

@@ -73,6 +73,11 @@ export async function bootCron() {
type: 'flushGroups',
pattern: 1000 * 10,
},
{
name: 'flush',
type: 'flushLogs',
pattern: 1000 * 10,
},
{
name: 'insightsDaily',
type: 'insightsDaily',

View File

@@ -8,6 +8,7 @@ import {
gscQueue,
importQueue,
insightsQueue,
logsQueue,
miscQueue,
notificationQueue,
queueLogger,
@@ -22,6 +23,7 @@ import { incomingEvent } from './jobs/events.incoming-event';
import { gscJob } from './jobs/gsc';
import { importJob } from './jobs/import';
import { insightsProjectJob } from './jobs/insights';
import { incomingLog } from './jobs/logs.incoming-log';
import { miscJob } from './jobs/misc';
import { notificationJob } from './jobs/notification';
import { sessionsJob } from './jobs/sessions';
@@ -59,6 +61,7 @@ function getEnabledQueues(): QueueName[] {
'import',
'insights',
'gsc',
'logs',
];
}
@@ -221,6 +224,20 @@ export function bootWorkers() {
logger.info('Started worker for gsc', { concurrency });
}
// Start logs worker
if (enabledQueues.includes('logs')) {
const concurrency = getConcurrencyFor('logs', 10);
const logsWorker = new Worker(
logsQueue.name,
async (job) => {
await incomingLog(job.data.payload);
},
{ ...workerOptions, concurrency },
);
workers.push(logsWorker);
logger.info('Started worker for logs', { concurrency });
}
if (workers.length === 0) {
logger.warn(
'No workers started. Check ENABLED_QUEUES environment variable.'

View File

@@ -1,6 +1,6 @@
import type { Job } from 'bullmq';
import { eventBuffer, groupBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db';
import { eventBuffer, groupBuffer, logBuffer, profileBackfillBuffer, profileBuffer, replayBuffer, sessionBuffer } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import { jobdeleteProjects } from './cron.delete-projects';
@@ -33,6 +33,9 @@ export async function cronJob(job: Job<CronQueuePayload>) {
case 'flushGroups': {
return await groupBuffer.tryFlush();
}
case 'flushLogs': {
return await logBuffer.tryFlush();
}
case 'ping': {
return await ping();
}

View File

@@ -0,0 +1,63 @@
import type { IClickhouseLog } from '@openpanel/db';
import { logBuffer } from '@openpanel/db';
import type { LogsQueuePayload } from '@openpanel/queue';
import { SEVERITY_TEXT_TO_NUMBER } from '@openpanel/validation';
import { logger as baseLogger } from '@/utils/logger';
export async function incomingLog(
payload: LogsQueuePayload['payload'],
): Promise<void> {
const logger = baseLogger.child({ projectId: payload.projectId });
try {
const { log, uaInfo, geo, deviceId, sessionId, projectId, headers } = payload;
const sdkName = headers['openpanel-sdk-name'] ?? '';
const sdkVersion = headers['openpanel-sdk-version'] ?? '';
const severityNumber =
log.severityNumber ??
SEVERITY_TEXT_TO_NUMBER[log.severity] ??
9; // INFO fallback
const row: IClickhouseLog = {
project_id: projectId,
device_id: deviceId,
profile_id: log.profileId ? String(log.profileId) : '',
session_id: sessionId,
timestamp: log.timestamp,
observed_at: new Date().toISOString(),
severity_number: severityNumber,
severity_text: log.severity,
body: log.body,
trace_id: log.traceId ?? '',
span_id: log.spanId ?? '',
trace_flags: log.traceFlags ?? 0,
logger_name: log.loggerName ?? '',
attributes: log.attributes ?? {},
resource: log.resource ?? {},
sdk_name: sdkName,
sdk_version: sdkVersion,
country: geo.country ?? '',
city: geo.city ?? '',
region: geo.region ?? '',
os: uaInfo.os ?? '',
os_version: uaInfo.osVersion ?? '',
browser: uaInfo.isServer ? '' : (uaInfo.browser ?? ''),
browser_version: uaInfo.isServer ? '' : (uaInfo.browserVersion ?? ''),
device: uaInfo.device ?? '',
brand: uaInfo.isServer ? '' : (uaInfo.brand ?? ''),
model: uaInfo.isServer ? '' : (uaInfo.model ?? ''),
};
logBuffer.add(row);
logger.info('Log queued', {
severity: log.severity,
loggerName: log.loggerName,
});
} catch (error) {
logger.error('Failed to process incoming log', { error });
throw error;
}
}