diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts new file mode 100644 index 00000000..634fd054 --- /dev/null +++ b/apps/api/src/controllers/track.controller.ts @@ -0,0 +1,262 @@ +import type { GeoLocation } from '@/utils/parseIp'; +import { getClientIp, parseIp } from '@/utils/parseIp'; +import { parseUserAgent } from '@/utils/parseUserAgent'; +import type { FastifyReply, FastifyRequest } from 'fastify'; +import { assocPath, pathOr } from 'ramda'; + +import { generateDeviceId } from '@openpanel/common'; +import { + ch, + getProfileById, + getSalts, + TABLE_NAMES, + upsertProfile, +} from '@openpanel/db'; +import { eventsQueue } from '@openpanel/queue'; +import { getRedisCache } from '@openpanel/redis'; +import type { + AliasPayload, + DecrementPayload, + IdentifyPayload, + IncrementPayload, + TrackHandlerPayload, +} from '@openpanel/sdk'; + +export async function handler( + request: FastifyRequest<{ + Body: TrackHandlerPayload; + }>, + reply: FastifyReply +) { + const ip = getClientIp(request)!; + const ua = request.headers['user-agent']!; + const projectId = request.client?.projectId; + + if (!projectId) { + reply.status(400).send('missing origin'); + return; + } + + switch (request.body.type) { + case 'track': { + const [salts, geo] = await Promise.all([getSalts(), parseIp(ip)]); + const currentDeviceId = generateDeviceId({ + salt: salts.current, + origin: projectId, + ip, + ua, + }); + const previousDeviceId = generateDeviceId({ + salt: salts.previous, + origin: projectId, + ip, + ua, + }); + await track({ + payload: request.body.payload, + currentDeviceId, + previousDeviceId, + projectId, + geo, + ua, + }); + break; + } + case 'identify': { + const geo = await parseIp(ip); + await identify({ + payload: request.body.payload, + projectId, + geo, + ua, + }); + break; + } + case 'alias': { + await alias({ + payload: request.body.payload, + projectId, + }); + break; + } + case 'increment': { + await increment({ + payload: request.body.payload, + projectId, + }); + break; + } + case 'decrement': { + await decrement({ + payload: request.body.payload, + projectId, + }); + break; + } + } +} + +type TrackPayload = { + name: string; + properties?: Record; +}; + +async function track({ + payload, + currentDeviceId, + previousDeviceId, + projectId, + geo, + ua, +}: { + payload: TrackPayload; + currentDeviceId: string; + previousDeviceId: string; + projectId: string; + geo: GeoLocation; + ua: string; +}) { + // this will ensure that we don't have multiple events creating sessions + const locked = await getRedisCache().set( + `request:priority:${currentDeviceId}-${previousDeviceId}`, + 'locked', + 'EX', + 10, + 'NX' + ); + + eventsQueue.add('event', { + type: 'incomingEvent', + payload: { + projectId, + headers: { + ua, + }, + event: { + ...payload, + // Dont rely on the client for the timestamp + timestamp: new Date().toISOString(), + }, + geo, + currentDeviceId, + previousDeviceId, + priority: locked === 'OK', + }, + }); +} + +async function identify({ + payload, + projectId, + geo, + ua, +}: { + payload: IdentifyPayload; + projectId: string; + geo: GeoLocation; + ua: string; +}) { + const uaInfo = parseUserAgent(ua); + await upsertProfile({ + id: payload.profileId, + isExternal: true, + projectId, + properties: { + ...(payload.properties ?? {}), + ...(geo ?? {}), + ...uaInfo, + }, + ...payload, + }); +} + +async function alias({ + payload, + projectId, +}: { + payload: AliasPayload; + projectId: string; +}) { + await ch.insert({ + table: TABLE_NAMES.alias, + values: [ + { + projectId, + profile_id: payload.profileId, + alias: payload.alias, + }, + ], + }); +} + +async function increment({ + payload, + projectId, +}: { + payload: IncrementPayload; + projectId: string; +}) { + const { profileId, property, value } = payload; + const profile = await getProfileById(profileId, projectId); + if (!profile) { + throw new Error('Not found'); + } + + const parsed = parseInt( + pathOr('0', property.split('.'), profile.properties), + 10 + ); + + if (isNaN(parsed)) { + throw new Error('Not number'); + } + + profile.properties = assocPath( + property.split('.'), + parsed + (value || 1), + profile.properties + ); + + await upsertProfile({ + id: profile.id, + projectId, + properties: profile.properties, + isExternal: true, + }); +} + +async function decrement({ + payload, + projectId, +}: { + payload: DecrementPayload; + projectId: string; +}) { + const { profileId, property, value } = payload; + const profile = await getProfileById(profileId, projectId); + if (!profile) { + throw new Error('Not found'); + } + + const parsed = parseInt( + pathOr('0', property.split('.'), profile.properties), + 10 + ); + + if (isNaN(parsed)) { + throw new Error('Not number'); + } + + profile.properties = assocPath( + property.split('.'), + parsed - (value || 1), + profile.properties + ); + + await upsertProfile({ + id: profile.id, + projectId, + properties: profile.properties, + isExternal: true, + }); +} diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts new file mode 100644 index 00000000..8829829f --- /dev/null +++ b/apps/api/src/routes/track.router.ts @@ -0,0 +1,69 @@ +import { isBot } from '@/bots'; +import type { TrackHandlerPayload } from '@/controllers/track.controller'; +import { handler } from '@/controllers/track.controller'; +import { SdkAuthError, validateSdkRequest } from '@/utils/auth'; +import { logger } from '@/utils/logger'; +import type { FastifyPluginCallback, FastifyRequest } from 'fastify'; + +import { createBotEvent } from '@openpanel/db'; + +const eventRouter: FastifyPluginCallback = (fastify, opts, done) => { + fastify.addHook( + 'preHandler', + async ( + req: FastifyRequest<{ + Body: TrackHandlerPayload; + }>, + reply + ) => { + try { + const client = await validateSdkRequest(req.headers).catch((error) => { + if (!(error instanceof SdkAuthError)) { + logger.error(error, 'Failed to validate sdk request'); + } + return null; + }); + + if (!client?.projectId) { + return reply.status(401).send(); + } + + req.projectId = client.projectId; + req.client = client; + + const bot = req.headers['user-agent'] + ? isBot(req.headers['user-agent']) + : null; + + if (bot) { + if (req.body.type === 'track') { + const path = (req.body.payload.properties?.__path || + req.body.payload.properties?.path) as string | undefined; + await createBotEvent({ + ...bot, + projectId: client.projectId, + path: path ?? '', + createdAt: new Date(), + }); + } + + reply.status(202).send('OK'); + } + } catch (e) { + logger.error(e, 'Failed to create bot event'); + reply.status(401).send(); + return; + } + } + ); + + fastify.route({ + method: 'POST', + url: '/', + handler: handler, + }); + + done(); +}; + +export default eventRouter; diff --git a/apps/api/src/utils/parseIp.ts b/apps/api/src/utils/parseIp.ts index dc7437e6..c75cc226 100644 --- a/apps/api/src/utils/parseIp.ts +++ b/apps/api/src/utils/parseIp.ts @@ -11,7 +11,7 @@ interface RemoteIpLookupResponse { latitude: number | undefined; } -interface GeoLocation { +export interface GeoLocation { country: string | undefined; city: string | undefined; region: string | undefined; diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 1ee4a99d..100db59f 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -11,7 +11,7 @@ import { toISOString, } from '@openpanel/common'; import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; -import { createEvent } from '@openpanel/db'; +import { chQuery, createEvent, TABLE_NAMES } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; import { eventsQueue, findJobByPrefix, sessionsQueue } from '@openpanel/queue'; import type { @@ -20,14 +20,6 @@ import type { } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; -function noDateInFuture(eventDate: Date): Date { - if (eventDate > new Date()) { - return new Date(); - } else { - return eventDate; - } -} - const GLOBAL_PROPERTIES = ['__path', '__referrer']; const SESSION_TIMEOUT = 1000 * 60 * 30; const SESSION_END_TIMEOUT = SESSION_TIMEOUT + 1000; @@ -54,8 +46,12 @@ export async function incomingEvent(job: Job) { ); }; - const profileId = body.profileId ? String(body.profileId) : ''; - const createdAt = noDateInFuture(new Date(body.timestamp)); + // this will get the profileId from the alias table if it exists + const profileId = await getProfileId({ + projectId, + profileId: body.profileId, + }); + const createdAt = new Date(body.timestamp); const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) @@ -263,3 +259,29 @@ async function getSessionEnd({ // Create session return null; } + +async function getProfileId({ + profileId, + projectId, +}: { + profileId: string | undefined; + projectId: string; +}) { + if (!profileId) { + return ''; + } + + const res = await chQuery<{ + alias: string; + profile_id: string; + project_id: string; + }>( + `SELECT * FROM ${TABLE_NAMES.alias} WHERE project_id = '${projectId}' AND (alias = '${profileId}' OR profile_id = '${profileId}')` + ); + + if (res[0]) { + return res[0].profile_id; + } + + return profileId; +} diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 210bea52..828c2da9 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -4,6 +4,7 @@ import { createClient } from '@clickhouse/client'; export const TABLE_NAMES = { events: 'events_v2', profiles: 'profiles', + alias: 'profile_aliases', }; export const originalCh = createClient({ diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 2cb4cbf5..911531aa 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -2,13 +2,15 @@ import { Queue } from 'bullmq'; import type { IServiceEvent } from '@openpanel/db'; import { getRedisQueue } from '@openpanel/redis'; -import type { PostEventPayload } from '@openpanel/sdk'; +import type { TrackPayload } from '@openpanel/sdk'; export interface EventsQueuePayloadIncomingEvent { type: 'incomingEvent'; payload: { projectId: string; - event: PostEventPayload; + event: TrackPayload & { + timestamp: string; + }; geo: { country: string | undefined; city: string | undefined; diff --git a/packages/sdks/express/index.ts b/packages/sdks/express/index.ts index bd55c22a..fa0d1e4c 100644 --- a/packages/sdks/express/index.ts +++ b/packages/sdks/express/index.ts @@ -1,8 +1,8 @@ import type { NextFunction, Request, Response } from 'express'; import { getClientIp } from 'request-ip'; -import type { OpenpanelSdkOptions } from '@openpanel/sdk'; -import { OpenpanelSdk } from '@openpanel/sdk'; +import type { OpenPanelOptions } from '@openpanel/sdk'; +import { OpenPanel } from '@openpanel/sdk'; export * from '@openpanel/sdk'; @@ -10,33 +10,35 @@ declare global { // eslint-disable-next-line @typescript-eslint/no-namespace namespace Express { export interface Request { - op: OpenpanelSdk; + op: OpenPanel; } } } -export type OpenpanelOptions = OpenpanelSdkOptions & { +export type OpenpanelOptions = OpenPanelOptions & { trackRequest?: (url: string) => boolean; getProfileId?: (req: Request) => string; }; export default function createMiddleware(options: OpenpanelOptions) { return function middleware(req: Request, res: Response, next: NextFunction) { - const sdk = new OpenpanelSdk(options); + const sdk = new OpenPanel(options); const ip = getClientIp(req); if (ip) { - sdk.api.headers['x-client-ip'] = ip; + sdk.api.addHeader('x-client-ip', ip); } if (options.getProfileId) { const profileId = options.getProfileId(req); if (profileId) { - sdk.setProfileId(profileId); + sdk.identify({ + profileId, + }); } } if (options.trackRequest?.(req.url)) { - sdk.event('request', { + sdk.track('request', { url: req.url, method: req.method, query: req.query, diff --git a/packages/sdks/nextjs/index.tsx b/packages/sdks/nextjs/index.tsx index 921c1444..7c83df30 100644 --- a/packages/sdks/nextjs/index.tsx +++ b/packages/sdks/nextjs/index.tsx @@ -2,10 +2,12 @@ import React from 'react'; import Script from 'next/script'; import type { - OpenpanelEventOptions, - OpenpanelOptions, - PostEventPayload, - UpdateProfilePayload, + DecrementPayload, + IdentifyPayload, + IncrementPayload, + OpenPanelMethodNames, + OpenPanelOptions, + TrackProperties, } from '@openpanel/web'; export * from '@openpanel/web'; @@ -13,48 +15,33 @@ export { createNextRouteHandler } from './createNextRouteHandler'; const CDN_URL = 'https://openpanel.dev/op.js'; -declare global { - interface Window { - op: { - q?: [string, ...any[]]; - (method: OpenpanelMethods, ...args: any[]): void; - }; - } -} - -type OpenpanelMethods = - | 'ctor' - | 'event' - | 'setProfile' - | 'setProfileId' - | 'increment' - | 'decrement' - | 'clear'; - -declare global { - interface window { - op: { - q?: [string, ...any[]]; - (method: OpenpanelMethods, ...args: any[]): void; - }; - } -} - -type OpenpanelProviderProps = OpenpanelOptions & { +type OpenPanelComponentProps = OpenPanelOptions & { profileId?: string; cdnUrl?: string; }; -export function OpenpanelProvider({ +export function OpenPanelComponent({ profileId, cdnUrl, ...options -}: OpenpanelProviderProps) { - const methods: { name: OpenpanelMethods; value: unknown }[] = [ - { name: 'ctor', value: options }, +}: OpenPanelComponentProps) { + const methods: { name: OpenPanelMethodNames; value: unknown }[] = [ + { + name: 'init', + value: { + ...options, + sdk: 'nextjs', + sdkVersion: process.env.NEXTJS_VERSION!, + }, + }, ]; if (profileId) { - methods.push({ name: 'setProfileId', value: profileId }); + methods.push({ + name: 'identify', + value: { + profileId, + }, + }); } return ( <> @@ -73,25 +60,9 @@ export function OpenpanelProvider({ ); } -interface SetProfileIdProps { - value?: string; -} +type IdentifyComponentProps = IdentifyPayload; -export function SetProfileId({ value }: SetProfileIdProps) { - return ( - <> -