This commit is contained in:
Carl-Gerhard Lindesvärd
2025-11-10 10:09:59 +01:00
parent 37246f57f0
commit bb0e413b06
9 changed files with 76 additions and 107 deletions

View File

@@ -1,3 +1,4 @@
import { cacheable } from '@openpanel/redis';
import bots from './bots'; import bots from './bots';
// Pre-compile regex patterns at module load time // Pre-compile regex patterns at module load time
@@ -14,59 +15,31 @@ const compiledBots = bots.map((bot) => {
const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot); const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot);
const includesBots = compiledBots.filter((bot) => 'includes' in bot); const includesBots = compiledBots.filter((bot) => 'includes' in bot);
// Common legitimate browser patterns - if UA matches these, it's very likely a real browser export const isBot = cacheable(
// This provides ultra-fast early exit for ~95% of real traffic 'is-bot',
const legitimateBrowserPatterns = [ (ua: string) => {
'Mozilla/5.0', // Nearly all modern browsers // Check simple string patterns first (fast)
'Chrome/', // Chrome/Chromium browsers for (const bot of includesBots) {
'Safari/', // Safari and Chrome-based browsers if (ua.includes(bot.includes)) {
'Firefox/', // Firefox return {
'Edg/', // Edge name: bot.name,
]; type: 'category' in bot ? bot.category : 'Unknown',
};
const mobilePatterns = ['iPhone', 'Android', 'iPad']; }
const desktopOSPatterns = ['Windows NT', 'Macintosh', 'X11; Linux'];
export function isBot(ua: string) {
// Ultra-fast early exit: check if this looks like a legitimate browser
// Real browsers typically have Mozilla/5.0 + browser name + OS
if (ua.includes('Mozilla/5.0')) {
// Check for browser signature
const hasBrowser = legitimateBrowserPatterns.some((pattern) =>
ua.includes(pattern),
);
// Check for OS signature (mobile or desktop)
const hasOS =
mobilePatterns.some((pattern) => ua.includes(pattern)) ||
desktopOSPatterns.some((pattern) => ua.includes(pattern));
// If it has Mozilla/5.0, a known browser, and an OS, it's very likely legitimate
if (hasBrowser && hasOS) {
return null;
} }
}
// Check simple string patterns first (fast) // Check regex patterns (slower)
for (const bot of includesBots) { for (const bot of regexBots) {
if (ua.includes(bot.includes)) { if (bot.compiledRegex.test(ua)) {
return { return {
name: bot.name, name: bot.name,
type: 'category' in bot ? bot.category : 'Unknown', type: 'category' in bot ? bot.category : 'Unknown',
}; };
}
} }
}
// Check regex patterns (slower) return null;
for (const bot of regexBots) { },
if (bot.compiledRegex.test(ua)) { 60 * 60, // 1 hour
return { 'lru',
name: bot.name, );
type: 'category' in bot ? bot.category : 'Unknown',
};
}
}
return null;
}

View File

@@ -1,14 +1,11 @@
import type { FastifyReply, FastifyRequest } from 'fastify'; import type { FastifyReply, FastifyRequest } from 'fastify';
import { assocPath, pathOr, pick } from 'ramda'; import { assocPath, pathOr, pick } from 'ramda';
import { logger } from '@/utils/logger';
import { generateId } from '@openpanel/common'; import { generateId } from '@openpanel/common';
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import type { ILogger } from '@openpanel/logger';
import { getEventsGroupQueueShard } from '@openpanel/queue'; import { getEventsGroupQueueShard } from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import type { import type {
DecrementPayload, DecrementPayload,
IdentifyPayload, IdentifyPayload,
@@ -241,25 +238,6 @@ async function track({
const jobId = [payload.name, timestamp, projectId, currentDeviceId, groupId] const jobId = [payload.name, timestamp, projectId, currentDeviceId, groupId]
.filter(Boolean) .filter(Boolean)
.join('-'); .join('-');
await getRedisCache().incr('track:counter');
log('track handler', {
jobId: jobId,
groupId: groupId,
timestamp: timestamp,
data: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
},
});
await getEventsGroupQueueShard(groupId).add({ await getEventsGroupQueueShard(groupId).add({
orderMs: timestamp, orderMs: timestamp,
data: { data: {

View File

@@ -6,9 +6,9 @@ import { duplicateHook } from '@/hooks/duplicate.hook';
import { isBotHook } from '@/hooks/is-bot.hook'; import { isBotHook } from '@/hooks/is-bot.hook';
const trackRouter: FastifyPluginCallback = async (fastify) => { const trackRouter: FastifyPluginCallback = async (fastify) => {
fastify.addHook('preHandler', isBotHook);
fastify.addHook('preValidation', duplicateHook); fastify.addHook('preValidation', duplicateHook);
fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', clientHook);
fastify.addHook('preHandler', isBotHook);
fastify.route({ fastify.route({
method: 'POST', method: 'POST',

View File

@@ -216,10 +216,10 @@ export async function bootWorkers() {
(worker as Worker).on('failed', (job) => { (worker as Worker).on('failed', (job) => {
if (job) { if (job) {
if (job.processedOn && job.finishedOn) { if (job.processedOn && job.finishedOn) {
const duration = job.finishedOn - job.processedOn; const elapsed = job.finishedOn - job.processedOn;
eventsGroupJobDuration.observe( eventsGroupJobDuration.observe(
{ queue_shard: worker.name, status: 'failed' }, { name: worker.name, status: 'failed' },
duration, elapsed,
); );
} }
logger.error('job failed', { logger.error('job failed', {
@@ -235,10 +235,15 @@ export async function bootWorkers() {
(worker as Worker).on('completed', (job) => { (worker as Worker).on('completed', (job) => {
if (job) { if (job) {
if (job.processedOn && job.finishedOn) { if (job.processedOn && job.finishedOn) {
const duration = job.finishedOn - job.processedOn; const elapsed = job.finishedOn - job.processedOn;
logger.info('job completed', {
jobId: job.id,
worker: worker.name,
elapsed,
});
eventsGroupJobDuration.observe( eventsGroupJobDuration.observe(
{ queue_shard: worker.name, status: 'success' }, { name: worker.name, status: 'success' },
duration, elapsed,
); );
} }
} }

View File

@@ -16,9 +16,9 @@ const queues = [sessionsQueue, cronQueue, ...eventsGroupQueues];
// Histogram to track job processing time for eventsGroupQueues // Histogram to track job processing time for eventsGroupQueues
export const eventsGroupJobDuration = new client.Histogram({ export const eventsGroupJobDuration = new client.Histogram({
name: 'events_group_job_duration_ms', name: 'job_duration_ms',
help: 'Duration of job processing in eventsGroupQueues (in ms)', help: 'Duration of job processing (in ms)',
labelNames: ['queue_shard', 'status'], labelNames: ['name', 'status'],
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
}); });

View File

@@ -34,4 +34,8 @@ export async function getClientById(
}); });
} }
export const getClientByIdCached = cacheable(getClientById, 60 * 60 * 24, true); export const getClientByIdCached = cacheable(
getClientById,
60 * 60 * 24,
'both',
);

View File

@@ -340,7 +340,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
sdk_version: payload.sdkVersion ?? '', sdk_version: payload.sdkVersion ?? '',
}; };
await Promise.all([sessionBuffer.add(event), eventBuffer.add(event)]); const promises = [sessionBuffer.add(event), eventBuffer.add(event)];
if (payload.profileId) { if (payload.profileId) {
const profile: IServiceUpsertProfile = { const profile: IServiceUpsertProfile = {
@@ -371,10 +371,12 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
profile.isExternal || profile.isExternal ||
(profile.isExternal === false && payload.name === 'session_start') (profile.isExternal === false && payload.name === 'session_start')
) { ) {
await upsertProfile(profile, true); promises.push(upsertProfile(profile, true));
} }
} }
await Promise.all(promises);
return { return {
document: event, document: event,
}; };

View File

@@ -1,6 +1,6 @@
import { generateSalt } from '@openpanel/common/server'; import { generateSalt } from '@openpanel/common/server';
import { cacheable, getRedisCache } from '@openpanel/redis'; import { cacheable } from '@openpanel/redis';
import { db } from '../prisma-client'; import { db } from '../prisma-client';
export async function getCurrentSalt() { export async function getCurrentSalt() {
@@ -43,7 +43,7 @@ export const getSalts = cacheable(
return salts; return salts;
}, },
60 * 10, 60 * 10,
true, 'both',
); );
export async function createInitialSalts() { export async function createInitialSalts() {

View File

@@ -128,11 +128,13 @@ function hasResult(result: unknown): boolean {
return true; return true;
} }
type CacheMode = 'lru' | 'redis' | 'both';
// Overload 1: cacheable(fn, expireInSec, lruCache?) // Overload 1: cacheable(fn, expireInSec, lruCache?)
export function cacheable<T extends (...args: any) => any>( export function cacheable<T extends (...args: any) => any>(
fn: T, fn: T,
expireInSec: number, expireInSec: number,
lruCache?: boolean, cacheMode?: CacheMode,
): T & { ): T & {
getKey: (...args: Parameters<T>) => string; getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => Promise<number>; clear: (...args: Parameters<T>) => Promise<number>;
@@ -146,7 +148,7 @@ export function cacheable<T extends (...args: any) => any>(
name: string, name: string,
fn: T, fn: T,
expireInSec: number, expireInSec: number,
lruCache?: boolean, cacheMode?: CacheMode,
): T & { ): T & {
getKey: (...args: Parameters<T>) => string; getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => Promise<number>; clear: (...args: Parameters<T>) => Promise<number>;
@@ -159,8 +161,8 @@ export function cacheable<T extends (...args: any) => any>(
export function cacheable<T extends (...args: any) => any>( export function cacheable<T extends (...args: any) => any>(
fnOrName: T | string, fnOrName: T | string,
fnOrExpireInSec: number | T, fnOrExpireInSec: number | T,
_expireInSecOrLruCache?: number | boolean, _expireInSecOrCacheMode?: number | CacheMode,
_lruCache?: boolean, _cacheMode?: CacheMode,
) { ) {
const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name; const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name;
const fn = const fn =
@@ -171,23 +173,23 @@ export function cacheable<T extends (...args: any) => any>(
: null; : null;
let expireInSec: number | null = null; let expireInSec: number | null = null;
let useLruCache = false; let cacheMode = 'redis';
// Parse parameters based on function signature // Parse parameters based on function signature
if (typeof fnOrName === 'function') { if (typeof fnOrName === 'function') {
// Overload 1: cacheable(fn, expireInSec, lruCache?) // Overload 1: cacheable(fn, expireInSec, lruCache?)
expireInSec = typeof fnOrExpireInSec === 'number' ? fnOrExpireInSec : null; expireInSec = typeof fnOrExpireInSec === 'number' ? fnOrExpireInSec : null;
useLruCache = cacheMode =
typeof _expireInSecOrLruCache === 'boolean' typeof _expireInSecOrCacheMode === 'boolean'
? _expireInSecOrLruCache ? _expireInSecOrCacheMode
: false; : 'redis';
} else { } else {
// Overload 2: cacheable(name, fn, expireInSec, lruCache?) // Overload 2: cacheable(name, fn, expireInSec, lruCache?)
expireInSec = expireInSec =
typeof _expireInSecOrLruCache === 'number' typeof _expireInSecOrCacheMode === 'number'
? _expireInSecOrLruCache ? _expireInSecOrCacheMode
: null; : null;
useLruCache = typeof _lruCache === 'boolean' ? _lruCache : false; cacheMode = typeof _cacheMode === 'string' ? _cacheMode : 'redis';
} }
if (typeof fn !== 'function') { if (typeof fn !== 'function') {
@@ -203,12 +205,13 @@ export function cacheable<T extends (...args: any) => any>(
`${cachePrefix}:${stringify(args)}`; `${cachePrefix}:${stringify(args)}`;
// Create function-specific LRU cache if enabled // Create function-specific LRU cache if enabled
const functionLruCache = useLruCache const functionLruCache =
? new LRUCache<string, any>({ cacheMode === 'lru' || cacheMode === 'both'
max: 1000, ? new LRUCache<string, any>({
ttl: expireInSec * 1000, // Convert seconds to milliseconds for LRU max: 1000,
}) ttl: expireInSec * 1000, // Convert seconds to milliseconds for LRU
: null; })
: null;
const cachedFn = async ( const cachedFn = async (
...args: Parameters<T> ...args: Parameters<T>
@@ -221,6 +224,10 @@ export function cacheable<T extends (...args: any) => any>(
if (lruHit !== undefined && hasResult(lruHit)) { if (lruHit !== undefined && hasResult(lruHit)) {
return lruHit; return lruHit;
} }
if (cacheMode === 'lru') {
return null as any;
}
} }
// L2 Cache: Check Redis cache (shared across instances) // L2 Cache: Check Redis cache (shared across instances)