diff --git a/.prettierignore b/.prettierignore index b17149b3..b7cc9369 100644 --- a/.prettierignore +++ b/.prettierignore @@ -1 +1,2 @@ -*.mdx \ No newline at end of file +*.mdx +*.sql \ No newline at end of file diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index aed924bc..24e9ef11 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -7,6 +7,8 @@ import { eventsQueue } from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; +import { getStringHeaders } from './track.controller'; + export async function postEvent( request: FastifyRequest<{ Body: PostEventPayload; @@ -49,9 +51,7 @@ export async function postEvent( type: 'incomingEvent', payload: { projectId: request.projectId, - headers: { - ua, - }, + headers: getStringHeaders(request.headers), event: { ...request.body, // Dont rely on the client for the timestamp diff --git a/apps/api/src/controllers/import.controller.ts b/apps/api/src/controllers/import.controller.ts index 47f2f900..5584ec6e 100644 --- a/apps/api/src/controllers/import.controller.ts +++ b/apps/api/src/controllers/import.controller.ts @@ -26,9 +26,6 @@ export async function importEvents( table: TABLE_NAMES.events, values, format: 'JSONEachRow', - clickhouse_settings: { - date_time_input_format: 'best_effort', - }, }); console.log(res.summary?.written_rows, 'events imported'); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 1123a3e3..eabedf2b 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -7,7 +7,8 @@ import { assocPath, pathOr } from 'ramda'; import { generateDeviceId } from '@openpanel/common'; import { ch, - chQuery, + createProfileAlias, + formatClickhouseDate, getProfileById, getProfileId, getSalts, @@ -24,6 +25,16 @@ import type { TrackHandlerPayload, } from '@openpanel/sdk'; +export function getStringHeaders(headers: FastifyRequest['headers']) { + return Object.entries(headers).reduce( + (acc, [key, value]) => ({ + ...acc, + [key]: value ? String(value) : undefined, + }), + {} + ); +} + export async function handler( request: FastifyRequest<{ Body: TrackHandlerPayload; @@ -45,12 +56,6 @@ export async function handler( request.body.payload.profileId = profileId; } - console.log( - '> Request', - request.body.type, - JSON.stringify(request.body.payload, null, 2) - ); - if (!projectId) { reply.status(400).send('missing origin'); return; @@ -59,25 +64,29 @@ export async function handler( switch (request.body.type) { case 'track': { const [salts, geo] = await Promise.all([getSalts(), parseIp(ip)]); - const currentDeviceId = generateDeviceId({ - salt: salts.current, - origin: projectId, - ip, - ua, - }); - const previousDeviceId = generateDeviceId({ - salt: salts.previous, - origin: projectId, - ip, - ua, - }); + const currentDeviceId = ua + ? generateDeviceId({ + salt: salts.current, + origin: projectId, + ip, + ua, + }) + : ''; + const previousDeviceId = ua + ? generateDeviceId({ + salt: salts.previous, + origin: projectId, + ip, + ua, + }) + : ''; await track({ payload: request.body.payload, currentDeviceId, previousDeviceId, projectId, geo, - ua, + headers: getStringHeaders(request.headers), }); break; } @@ -126,14 +135,14 @@ async function track({ previousDeviceId, projectId, geo, - ua, + headers, }: { payload: TrackPayload; currentDeviceId: string; previousDeviceId: string; projectId: string; geo: GeoLocation; - ua: string; + headers: Record; }) { // this will ensure that we don't have multiple events creating sessions const locked = await getRedisCache().set( @@ -148,9 +157,7 @@ async function track({ type: 'incomingEvent', payload: { projectId, - headers: { - ua, - }, + headers, event: { ...payload, // Dont rely on the client for the timestamp @@ -173,7 +180,7 @@ async function identify({ payload: IdentifyPayload; projectId: string; geo: GeoLocation; - ua: string; + ua?: string; }) { const uaInfo = parseUserAgent(ua); await upsertProfile({ @@ -196,15 +203,10 @@ async function alias({ payload: AliasPayload; projectId: string; }) { - await ch.insert({ - table: TABLE_NAMES.alias, - values: [ - { - projectId, - profile_id: payload.profileId, - alias: payload.alias, - }, - ], + await createProfileAlias({ + alias: payload.alias, + profileId: payload.profileId, + projectId, }); } diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index bf2194cf..9dfb4fa3 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -11,14 +11,14 @@ import { toISOString, } from '@openpanel/common'; import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; -import { chQuery, createEvent, TABLE_NAMES } from '@openpanel/db'; +import { createEvent } from '@openpanel/db'; import { getLastScreenViewFromProfileId } from '@openpanel/db/src/services/event.service'; import { eventsQueue, findJobByPrefix, sessionsQueue } from '@openpanel/queue'; import type { EventsQueuePayloadCreateSessionEnd, EventsQueuePayloadIncomingEvent, } from '@openpanel/queue'; -import { cacheable, getRedisQueue } from '@openpanel/redis'; +import { getRedisQueue } from '@openpanel/redis'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; const SESSION_TIMEOUT = 1000 * 60 * 30; @@ -55,7 +55,10 @@ export async function incomingEvent(job: Job) { ? null : parseReferrer(getProperty('__referrer')); const utmReferrer = getReferrerWithQuery(query); - const uaInfo = parseUserAgent(headers.ua); + const userAgent = headers['user-agent']; + const sdkName = headers['openpanel-sdk-name']; + const sdkVersion = headers['openpanel-sdk-version']; + const uaInfo = parseUserAgent(userAgent); if (uaInfo.isServer) { const event = await getLastScreenViewFromProfileId({ @@ -71,7 +74,7 @@ export async function incomingEvent(job: Job) { projectId, properties: { ...omit(GLOBAL_PROPERTIES, properties), - user_agent: headers.ua, + user_agent: userAgent, }, createdAt, country: event?.country || geo.country || '', @@ -95,6 +98,8 @@ export async function incomingEvent(job: Job) { profile: undefined, meta: undefined, importedAt: null, + sdkName, + sdkVersion, }; return createEvent(payload); @@ -168,6 +173,8 @@ export async function incomingEvent(job: Job) { referrer: referrer?.url, referrerName: referrer?.name || utmReferrer?.name || '', referrerType: referrer?.type || utmReferrer?.type || '', + sdkName, + sdkVersion, }; if (!sessionEnd) { diff --git a/packages/db/clickhouse_init.sql b/packages/db/clickhouse_init.sql index 69d9ddb4..d4210fa6 100644 --- a/packages/db/clickhouse_init.sql +++ b/packages/db/clickhouse_init.sql @@ -1,8 +1,10 @@ CREATE DATABASE IF NOT EXISTS openpanel; -CREATE TABLE IF NOT EXISTS openpanel.events_v2 ( +CREATE TABLE openpanel.events_v2 ( `id` UUID DEFAULT generateUUIDv4(), `name` String, + `sdk_name` String, + `sdk_version` String, `device_id` String, `profile_id` String, `project_id` String, @@ -29,9 +31,9 @@ CREATE TABLE IF NOT EXISTS openpanel.events_v2 ( `model` String, `imported_at` Nullable(DateTime), INDEX idx_name name TYPE bloom_filter GRANULARITY 1, - INDEX idx_properties_bounce properties ['__bounce'] TYPE - set - (3) GRANULARITY 1 + INDEX idx_properties_bounce properties ['__bounce'] TYPE set (3) GRANULARITY 1, + INDEX idx_origin origin TYPE bloom_filter(0.05) GRANULARITY 1, + INDEX idx_path path TYPE bloom_filter(0.01) GRANULARITY 1 ) ENGINE = MergeTree PARTITION BY toYYYYMM(created_at) ORDER BY (project_id, toDate(created_at), profile_id, name) SETTINGS index_granularity = 8192; diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 75439133..763235fd 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -78,9 +78,6 @@ export class ProfileBuffer extends RedisBuffer { is_external: item.event.is_external, }; }), - clickhouse_settings: { - date_time_input_format: 'best_effort', - }, format: 'JSONEachRow', }); return queue.map((item) => item.index); diff --git a/packages/db/src/clickhouse-client.ts b/packages/db/src/clickhouse-client.ts index 1184d7eb..b2fdcde5 100644 --- a/packages/db/src/clickhouse-client.ts +++ b/packages/db/src/clickhouse-client.ts @@ -21,6 +21,9 @@ export const originalCh = createClient({ compression: { request: true, }, + clickhouse_settings: { + date_time_input_format: 'best_effort', + }, }); export const ch = new Proxy(originalCh, { diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index f7477a73..5f05c117 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -56,6 +56,8 @@ export interface IClickhouseEvent { brand: string; model: string; imported_at: string | null; + sdk_name: string; + sdk_version: string; // They do not exist here. Just make ts happy for now profile?: IServiceProfile; @@ -93,6 +95,8 @@ export function transformEvent(event: IClickhouseEvent): IServiceEvent { profile: event.profile, meta: event.meta, importedAt: event.imported_at ? new Date(event.imported_at) : null, + sdkName: event.sdk_name, + sdkVersion: event.sdk_version, }; } @@ -134,6 +138,8 @@ export interface IServiceEvent { importedAt: Date | null; profile: IServiceProfile | undefined; meta: EventMeta | undefined; + sdkName: string | undefined; + sdkVersion: string | undefined; } export interface IServiceEventMinimal { @@ -295,6 +301,8 @@ export async function createEvent(payload: IServiceCreateEventPayload) { referrer_name: payload.referrerName ?? '', referrer_type: payload.referrerType ?? '', imported_at: null, + sdk_name: payload.sdkName ?? '', + sdk_version: payload.sdkVersion ?? '', }; await eventBuffer.insert(event); diff --git a/packages/db/src/services/profile.service.ts b/packages/db/src/services/profile.service.ts index 4f2b43cb..6a12e05d 100644 --- a/packages/db/src/services/profile.service.ts +++ b/packages/db/src/services/profile.service.ts @@ -6,6 +6,7 @@ import type { IChartEventFilter } from '@openpanel/validation'; import { profileBuffer } from '../buffers'; import { + ch, chQuery, formatClickhouseDate, TABLE_NAMES, @@ -170,6 +171,29 @@ export function transformProfile({ }; } +export async function createProfileAlias({ + projectId, + alias, + profileId, +}: { + projectId: string; + alias: string; + profileId: string; +}) { + await ch.insert({ + table: TABLE_NAMES.alias, + format: 'JSONEachRow', + values: [ + { + projectId, + profile_id: profileId, + alias, + created_at: new Date(), + }, + ], + }); +} + export async function upsertProfile({ id, firstName, diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 911531aa..7b1d278a 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -18,9 +18,7 @@ export interface EventsQueuePayloadIncomingEvent { longitude: number | undefined; latitude: number | undefined; }; - headers: { - ua: string | undefined; - }; + headers: Record; currentDeviceId: string; previousDeviceId: string; priority: boolean; diff --git a/packages/sdks/sdk/src/index.ts b/packages/sdks/sdk/src/index.ts index 12d318ee..5b37c88e 100644 --- a/packages/sdks/sdk/src/index.ts +++ b/packages/sdks/sdk/src/index.ts @@ -85,7 +85,7 @@ export class OpenPanel { defaultHeaders['openpanel-client-secret'] = options.clientSecret; } - defaultHeaders['openpanel-sdk'] = options.sdk || 'node'; + defaultHeaders['openpanel-sdk-name'] = options.sdk || 'node'; defaultHeaders['openpanel-sdk-version'] = options.sdkVersion || process.env.SDK_VERSION!;