import { getSafeJson } from '@openpanel/json'; import { getRedisCache } from '@openpanel/redis'; import { ch } from '../clickhouse/client'; import { BaseBuffer } from './base-buffer'; export interface IClickhouseLog { id?: string; project_id: string; device_id: string; profile_id: string; session_id: string; timestamp: string; observed_at: string; severity_number: number; severity_text: string; body: string; trace_id: string; span_id: string; trace_flags: number; logger_name: string; attributes: Record; resource: Record; sdk_name: string; sdk_version: string; country: string; city: string; region: string; os: string; os_version: string; browser: string; browser_version: string; device: string; brand: string; model: string; } export class LogBuffer extends BaseBuffer { private batchSize = process.env.LOG_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.LOG_BUFFER_BATCH_SIZE, 10) : 4000; private chunkSize = process.env.LOG_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.LOG_BUFFER_CHUNK_SIZE, 10) : 1000; private microBatchIntervalMs = process.env.LOG_BUFFER_MICRO_BATCH_MS ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_MS, 10) : 10; private microBatchMaxSize = process.env.LOG_BUFFER_MICRO_BATCH_SIZE ? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_SIZE, 10) : 100; private pendingLogs: IClickhouseLog[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; private flushRetryCount = 0; private queueKey = 'log_buffer:queue'; protected bufferCounterKey = 'log_buffer:total_count'; constructor() { super({ name: 'log', onFlush: async () => { await this.processBuffer(); }, }); } add(log: IClickhouseLog) { this.pendingLogs.push(log); if (this.pendingLogs.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); return; } if (!this.flushTimer) { this.flushTimer = setTimeout(() => { this.flushTimer = null; this.flushLocalBuffer(); }, this.microBatchIntervalMs); } } public async flush() { if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } await this.flushLocalBuffer(); } private async flushLocalBuffer() { if (this.isFlushing || this.pendingLogs.length === 0) { return; } this.isFlushing = true; const logsToFlush = this.pendingLogs; this.pendingLogs = []; try { // Push to Redis queue for processing const pipeline = getRedisCache().pipeline(); for (const log of logsToFlush) { pipeline.lpush(this.queueKey, JSON.stringify(log)); } await pipeline.exec(); // Increment counter await getRedisCache().incrby(this.bufferCounterKey, logsToFlush.length); this.flushRetryCount = 0; } catch (error) { this.logger.error('Failed to push logs to Redis queue', { error }); // Re-queue locally on failure this.pendingLogs = logsToFlush.concat(this.pendingLogs); this.flushRetryCount++; // If max retries exceeded, log and drop if (this.flushRetryCount >= 3) { this.logger.error('Max retries exceeded, dropping logs', { droppedCount: this.pendingLogs.length, }); this.pendingLogs = []; this.flushRetryCount = 0; } } finally { this.isFlushing = false; } } private async processBuffer() { const startTime = Date.now(); const redis = getRedisCache(); try { // Get batch of logs from Redis const batch: string[] = []; const pipeline = redis.pipeline(); for (let i = 0; i < this.batchSize; i++) { pipeline.rpop(this.queueKey); } const results = await pipeline.exec(); if (!results) { return; } for (const result of results) { if (result[1]) { batch.push(result[1] as string); } } if (batch.length === 0) { return; } this.logger.info(`Processing ${batch.length} logs`); // Parse logs const logs: IClickhouseLog[] = []; for (const item of batch) { try { const parsed = getSafeJson(item); if (parsed) { logs.push(parsed); } } catch (error) { this.logger.error('Failed to parse log', { error, item }); } } if (logs.length === 0) { return; } // Insert into ClickHouse in chunks const chunks = this.chunks(logs, this.chunkSize); for (const chunk of chunks) { await this.insertChunk(chunk); } // Decrement counter await redis.decrby(this.bufferCounterKey, batch.length); this.logger.info('Logs processed successfully', { count: logs.length, elapsed: Date.now() - startTime, }); } catch (error) { this.logger.error('Failed to process logs', { error }); throw error; } } private async insertChunk(logs: IClickhouseLog[]) { const query = ` INSERT INTO logs ( id, project_id, device_id, profile_id, session_id, timestamp, observed_at, severity_number, severity_text, body, trace_id, span_id, trace_flags, logger_name, attributes, resource, sdk_name, sdk_version, country, city, region, os, os_version, browser, browser_version, device, brand, model ) VALUES `; const values = logs .map((log) => { return `( generateUUIDv4(), ${escape(log.project_id)}, ${escape(log.device_id)}, ${escape(log.profile_id)}, ${escape(log.session_id)}, ${escape(log.timestamp)}, ${escape(log.observed_at)}, ${log.severity_number}, ${escape(log.severity_text)}, ${escape(log.body)}, ${escape(log.trace_id)}, ${escape(log.span_id)}, ${log.trace_flags}, ${escape(log.logger_name)}, ${mapToSql(log.attributes)}, ${mapToSql(log.resource)}, ${escape(log.sdk_name)}, ${escape(log.sdk_version)}, ${escape(log.country)}, ${escape(log.city)}, ${escape(log.region)}, ${escape(log.os)}, ${escape(log.os_version)}, ${escape(log.browser)}, ${escape(log.browser_version)}, ${escape(log.device)}, ${escape(log.brand)}, ${escape(log.model)} )`; }) .join(','); await ch.query({ query: `${query} ${values}`, clickhouse_settings: { wait_end_of_query: 1, }, }); } } function escape(value: string): string { if (value === null || value === undefined) { return "''"; } return `'${value.replace(/'/g, "\\'").replace(/\\/g, '\\\\')}'`; } function mapToSql(map: Record): string { if (!map || Object.keys(map).length === 0) { return '{}'; } const entries = Object.entries(map) .map(([k, v]) => `${escape(k)}: ${escape(v)}`) .join(', '); return `{${entries}}`; }