feat:add otel logging
This commit is contained in:
269
packages/db/src/buffers/log-buffer.ts
Normal file
269
packages/db/src/buffers/log-buffer.ts
Normal file
@@ -0,0 +1,269 @@
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
import { ch } from '../clickhouse/client';
|
||||
import { BaseBuffer } from './base-buffer';
|
||||
|
||||
export interface IClickhouseLog {
|
||||
id?: string;
|
||||
project_id: string;
|
||||
device_id: string;
|
||||
profile_id: string;
|
||||
session_id: string;
|
||||
timestamp: string;
|
||||
observed_at: string;
|
||||
severity_number: number;
|
||||
severity_text: string;
|
||||
body: string;
|
||||
trace_id: string;
|
||||
span_id: string;
|
||||
trace_flags: number;
|
||||
logger_name: string;
|
||||
attributes: Record<string, string>;
|
||||
resource: Record<string, string>;
|
||||
sdk_name: string;
|
||||
sdk_version: string;
|
||||
country: string;
|
||||
city: string;
|
||||
region: string;
|
||||
os: string;
|
||||
os_version: string;
|
||||
browser: string;
|
||||
browser_version: string;
|
||||
device: string;
|
||||
brand: string;
|
||||
model: string;
|
||||
}
|
||||
|
||||
export class LogBuffer extends BaseBuffer {
|
||||
private batchSize = process.env.LOG_BUFFER_BATCH_SIZE
|
||||
? Number.parseInt(process.env.LOG_BUFFER_BATCH_SIZE, 10)
|
||||
: 4000;
|
||||
private chunkSize = process.env.LOG_BUFFER_CHUNK_SIZE
|
||||
? Number.parseInt(process.env.LOG_BUFFER_CHUNK_SIZE, 10)
|
||||
: 1000;
|
||||
private microBatchIntervalMs = process.env.LOG_BUFFER_MICRO_BATCH_MS
|
||||
? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_MS, 10)
|
||||
: 10;
|
||||
private microBatchMaxSize = process.env.LOG_BUFFER_MICRO_BATCH_SIZE
|
||||
? Number.parseInt(process.env.LOG_BUFFER_MICRO_BATCH_SIZE, 10)
|
||||
: 100;
|
||||
|
||||
private pendingLogs: IClickhouseLog[] = [];
|
||||
private flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private isFlushing = false;
|
||||
private flushRetryCount = 0;
|
||||
|
||||
private queueKey = 'log_buffer:queue';
|
||||
protected bufferCounterKey = 'log_buffer:total_count';
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
name: 'log',
|
||||
onFlush: async () => {
|
||||
await this.processBuffer();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
add(log: IClickhouseLog) {
|
||||
this.pendingLogs.push(log);
|
||||
|
||||
if (this.pendingLogs.length >= this.microBatchMaxSize) {
|
||||
this.flushLocalBuffer();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.flushTimer) {
|
||||
this.flushTimer = setTimeout(() => {
|
||||
this.flushTimer = null;
|
||||
this.flushLocalBuffer();
|
||||
}, this.microBatchIntervalMs);
|
||||
}
|
||||
}
|
||||
|
||||
public async flush() {
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
await this.flushLocalBuffer();
|
||||
}
|
||||
|
||||
private async flushLocalBuffer() {
|
||||
if (this.isFlushing || this.pendingLogs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isFlushing = true;
|
||||
const logsToFlush = this.pendingLogs;
|
||||
this.pendingLogs = [];
|
||||
|
||||
try {
|
||||
// Push to Redis queue for processing
|
||||
const pipeline = getRedisCache().pipeline();
|
||||
for (const log of logsToFlush) {
|
||||
pipeline.lpush(this.queueKey, JSON.stringify(log));
|
||||
}
|
||||
await pipeline.exec();
|
||||
|
||||
// Increment counter
|
||||
await getRedisCache().incrby(this.bufferCounterKey, logsToFlush.length);
|
||||
|
||||
this.flushRetryCount = 0;
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to push logs to Redis queue', { error });
|
||||
// Re-queue locally on failure
|
||||
this.pendingLogs = logsToFlush.concat(this.pendingLogs);
|
||||
this.flushRetryCount++;
|
||||
|
||||
// If max retries exceeded, log and drop
|
||||
if (this.flushRetryCount >= 3) {
|
||||
this.logger.error('Max retries exceeded, dropping logs', {
|
||||
droppedCount: this.pendingLogs.length,
|
||||
});
|
||||
this.pendingLogs = [];
|
||||
this.flushRetryCount = 0;
|
||||
}
|
||||
} finally {
|
||||
this.isFlushing = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async processBuffer() {
|
||||
const startTime = Date.now();
|
||||
const redis = getRedisCache();
|
||||
|
||||
try {
|
||||
// Get batch of logs from Redis
|
||||
const batch: string[] = [];
|
||||
const pipeline = redis.pipeline();
|
||||
|
||||
for (let i = 0; i < this.batchSize; i++) {
|
||||
pipeline.rpop(this.queueKey);
|
||||
}
|
||||
|
||||
const results = await pipeline.exec();
|
||||
if (!results) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const result of results) {
|
||||
if (result[1]) {
|
||||
batch.push(result[1] as string);
|
||||
}
|
||||
}
|
||||
|
||||
if (batch.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Processing ${batch.length} logs`);
|
||||
|
||||
// Parse logs
|
||||
const logs: IClickhouseLog[] = [];
|
||||
for (const item of batch) {
|
||||
try {
|
||||
const parsed = getSafeJson<IClickhouseLog>(item);
|
||||
if (parsed) {
|
||||
logs.push(parsed);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to parse log', { error, item });
|
||||
}
|
||||
}
|
||||
|
||||
if (logs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Insert into ClickHouse in chunks
|
||||
const chunks = this.chunks(logs, this.chunkSize);
|
||||
for (const chunk of chunks) {
|
||||
await this.insertChunk(chunk);
|
||||
}
|
||||
|
||||
// Decrement counter
|
||||
await redis.decrby(this.bufferCounterKey, batch.length);
|
||||
|
||||
this.logger.info('Logs processed successfully', {
|
||||
count: logs.length,
|
||||
elapsed: Date.now() - startTime,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to process logs', { error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async insertChunk(logs: IClickhouseLog[]) {
|
||||
const query = `
|
||||
INSERT INTO logs (
|
||||
id, project_id, device_id, profile_id, session_id,
|
||||
timestamp, observed_at, severity_number, severity_text, body,
|
||||
trace_id, span_id, trace_flags, logger_name, attributes, resource,
|
||||
sdk_name, sdk_version, country, city, region,
|
||||
os, os_version, browser, browser_version, device, brand, model
|
||||
)
|
||||
VALUES
|
||||
`;
|
||||
|
||||
const values = logs
|
||||
.map((log) => {
|
||||
return `(
|
||||
generateUUIDv4(),
|
||||
${escape(log.project_id)},
|
||||
${escape(log.device_id)},
|
||||
${escape(log.profile_id)},
|
||||
${escape(log.session_id)},
|
||||
${escape(log.timestamp)},
|
||||
${escape(log.observed_at)},
|
||||
${log.severity_number},
|
||||
${escape(log.severity_text)},
|
||||
${escape(log.body)},
|
||||
${escape(log.trace_id)},
|
||||
${escape(log.span_id)},
|
||||
${log.trace_flags},
|
||||
${escape(log.logger_name)},
|
||||
${mapToSql(log.attributes)},
|
||||
${mapToSql(log.resource)},
|
||||
${escape(log.sdk_name)},
|
||||
${escape(log.sdk_version)},
|
||||
${escape(log.country)},
|
||||
${escape(log.city)},
|
||||
${escape(log.region)},
|
||||
${escape(log.os)},
|
||||
${escape(log.os_version)},
|
||||
${escape(log.browser)},
|
||||
${escape(log.browser_version)},
|
||||
${escape(log.device)},
|
||||
${escape(log.brand)},
|
||||
${escape(log.model)}
|
||||
)`;
|
||||
})
|
||||
.join(',');
|
||||
|
||||
await ch.query({
|
||||
query: `${query} ${values}`,
|
||||
clickhouse_settings: {
|
||||
wait_end_of_query: 1,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function escape(value: string): string {
|
||||
if (value === null || value === undefined) {
|
||||
return "''";
|
||||
}
|
||||
return `'${value.replace(/'/g, "\\'").replace(/\\/g, '\\\\')}'`;
|
||||
}
|
||||
|
||||
function mapToSql(map: Record<string, string>): string {
|
||||
if (!map || Object.keys(map).length === 0) {
|
||||
return '{}';
|
||||
}
|
||||
const entries = Object.entries(map)
|
||||
.map(([k, v]) => `${escape(k)}: ${escape(v)}`)
|
||||
.join(', ');
|
||||
return `{${entries}}`;
|
||||
}
|
||||
Reference in New Issue
Block a user