diff --git a/apps/start/src/components/events/event-list-item.tsx b/apps/start/src/components/events/event-list-item.tsx index aef74def..5908d70b 100644 --- a/apps/start/src/components/events/event-list-item.tsx +++ b/apps/start/src/components/events/event-list-item.tsx @@ -1,25 +1,20 @@ +import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; +import { Link } from '@tanstack/react-router'; +import { SerieIcon } from '../report-chart/common/serie-icon'; +import { EventIcon } from './event-icon'; import { Tooltiper } from '@/components/ui/tooltip'; import { useAppParams } from '@/hooks/use-app-params'; -import { useNumber } from '@/hooks/use-numer-formatter'; import { pushModal } from '@/modals'; import { cn } from '@/utils/cn'; import { getProfileName } from '@/utils/getters'; -import { Link } from '@tanstack/react-router'; - -import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; - -import { SerieIcon } from '../report-chart/common/serie-icon'; -import { EventIcon } from './event-icon'; type EventListItemProps = IServiceEventMinimal | IServiceEvent; export function EventListItem(props: EventListItemProps) { const { organizationId, projectId } = useAppParams(); - const { createdAt, name, path, duration, meta } = props; + const { createdAt, name, path, meta } = props; const profile = 'profile' in props ? props.profile : null; - const number = useNumber(); - const renderName = () => { if (name === 'screen_view') { if (path.includes('/')) { @@ -32,83 +27,65 @@ export function EventListItem(props: EventListItemProps) { return name.replace(/_/g, ' '); }; - const renderDuration = () => { - if (name === 'screen_view') { - return ( - - {number.shortWithUnit(duration / 1000, 'min')} - - ); - } - - return null; - }; - const isMinimal = 'minimal' in props; return ( - <> - - + )} + + +
+ {createdAt.toLocaleTimeString()} +
+
+ + ); } diff --git a/apps/start/src/components/events/table/columns.tsx b/apps/start/src/components/events/table/columns.tsx index 203d4f16..b14fde85 100644 --- a/apps/start/src/components/events/table/columns.tsx +++ b/apps/start/src/components/events/table/columns.tsx @@ -1,15 +1,14 @@ +import type { IServiceEvent } from '@openpanel/db'; +import type { ColumnDef } from '@tanstack/react-table'; +import { ColumnCreatedAt } from '@/components/column-created-at'; import { EventIcon } from '@/components/events/event-icon'; import { ProjectLink } from '@/components/links'; +import { ProfileAvatar } from '@/components/profiles/profile-avatar'; import { SerieIcon } from '@/components/report-chart/common/serie-icon'; +import { KeyValueGrid } from '@/components/ui/key-value-grid'; import { useNumber } from '@/hooks/use-numer-formatter'; import { pushModal } from '@/modals'; import { getProfileName } from '@/utils/getters'; -import type { ColumnDef } from '@tanstack/react-table'; - -import { ColumnCreatedAt } from '@/components/column-created-at'; -import { ProfileAvatar } from '@/components/profiles/profile-avatar'; -import { KeyValueGrid } from '@/components/ui/key-value-grid'; -import type { IServiceEvent } from '@openpanel/db'; export function useColumns() { const number = useNumber(); @@ -28,17 +27,24 @@ export function useColumns() { accessorKey: 'name', header: 'Name', cell({ row }) { - const { name, path, duration, properties, revenue } = row.original; + const { name, path, revenue } = row.original; + const fullTitle = + name === 'screen_view' + ? path + : name === 'revenue' && revenue + ? `${name} (${number.currency(revenue / 100)})` + : name.replace(/_/g, ' '); + const renderName = () => { if (name === 'screen_view') { if (path.includes('/')) { - return {path}; + return path; } return ( <> Screen: - {path} + {path} ); } @@ -50,38 +56,27 @@ export function useColumns() { return name.replace(/_/g, ' '); }; - const renderDuration = () => { - if (name === 'screen_view') { - return ( - - {number.shortWithUnit(duration / 1000, 'min')} - - ); - } - - return null; - }; - return ( -
+
- + - {renderDuration()}
); @@ -107,8 +101,8 @@ export function useColumns() { if (profile) { return ( {getProfileName(profile)} @@ -119,8 +113,8 @@ export function useColumns() { if (profileId && profileId !== deviceId) { return ( Unknown @@ -130,8 +124,8 @@ export function useColumns() { if (deviceId) { return ( Anonymous @@ -152,10 +146,10 @@ export function useColumns() { const { sessionId } = row.original; return ( - {sessionId.slice(0,6)} + {sessionId.slice(0, 6)} ); }, @@ -175,7 +169,7 @@ export function useColumns() { cell({ row }) { const { country, city } = row.original; return ( -
+
{city}
@@ -189,7 +183,7 @@ export function useColumns() { cell({ row }) { const { os } = row.original; return ( -
+
{os}
@@ -203,7 +197,7 @@ export function useColumns() { cell({ row }) { const { browser } = row.original; return ( -
+
{browser}
@@ -221,14 +215,14 @@ export function useColumns() { const { properties } = row.original; const filteredProperties = Object.fromEntries( Object.entries(properties || {}).filter( - ([key]) => !key.startsWith('__'), - ), + ([key]) => !key.startsWith('__') + ) ); const items = Object.entries(filteredProperties); const limit = 2; const data = items.slice(0, limit).map(([key, value]) => ({ name: key, - value: value, + value, })); if (items.length > limit) { data.push({ diff --git a/biome.json b/biome.json index 9e7b023e..9f718ec1 100644 --- a/biome.json +++ b/biome.json @@ -71,7 +71,8 @@ "noDangerouslySetInnerHtml": "off" }, "complexity": { - "noForEach": "off" + "noForEach": "off", + "noExcessiveCognitiveComplexity": "off" } } }, diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 50600c70..178f9454 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -2,42 +2,8 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { ch } from '../clickhouse/client'; -// Mock transformEvent to avoid circular dependency with buffers -> services -> buffers -vi.mock('../services/event.service', () => ({ - transformEvent: (event: any) => ({ - id: event.id ?? 'id', - name: event.name, - deviceId: event.device_id, - profileId: event.profile_id, - projectId: event.project_id, - sessionId: event.session_id, - properties: event.properties ?? {}, - createdAt: new Date(event.created_at ?? Date.now()), - country: event.country, - city: event.city, - region: event.region, - longitude: event.longitude, - latitude: event.latitude, - os: event.os, - osVersion: event.os_version, - browser: event.browser, - browserVersion: event.browser_version, - device: event.device, - brand: event.brand, - model: event.model, - duration: event.duration ?? 0, - path: event.path ?? '', - origin: event.origin ?? '', - referrer: event.referrer, - referrerName: event.referrer_name, - referrerType: event.referrer_type, - meta: event.meta, - importedAt: undefined, - sdkName: event.sdk_name, - sdkVersion: event.sdk_version, - profile: event.profile, - }), -})); +// Break circular dep: event-buffer -> event.service -> buffers/index -> EventBuffer +vi.mock('../services/event.service', () => ({})); import { EventBuffer } from './event-buffer'; @@ -68,19 +34,16 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - // Get initial count const initialCount = await eventBuffer.getBufferSize(); - // Add event and flush (events are micro-batched) eventBuffer.add(event); await eventBuffer.flush(); - // Buffer counter should increase by 1 const newCount = await eventBuffer.getBufferSize(); expect(newCount).toBe(initialCount + 1); }); - it('adds multiple screen_views - moves previous to buffer with duration', async () => { + it('adds screen_view directly to buffer queue', async () => { const t0 = Date.now(); const sessionId = 'session_1'; @@ -100,63 +63,23 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 1000).toISOString(), } as any; - const view3 = { - project_id: 'p1', - profile_id: 'u1', - session_id: sessionId, - name: 'screen_view', - created_at: new Date(t0 + 3000).toISOString(), - } as any; - - // Add first screen_view const count1 = await eventBuffer.getBufferSize(); + eventBuffer.add(view1); await eventBuffer.flush(); - // Should be stored as "last" but NOT in queue yet + // screen_view goes directly to buffer const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1); // No change in buffer + expect(count2).toBe(count1 + 1); - // Last screen_view should be retrievable - const last1 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last1).not.toBeNull(); - expect(last1!.createdAt.toISOString()).toBe(view1.created_at); - - // Add second screen_view eventBuffer.add(view2); await eventBuffer.flush(); - // Now view1 should be in buffer const count3 = await eventBuffer.getBufferSize(); - expect(count3).toBe(count1 + 1); - - // view2 should now be the "last" - const last2 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last2!.createdAt.toISOString()).toBe(view2.created_at); - - // Add third screen_view - eventBuffer.add(view3); - await eventBuffer.flush(); - - // Now view2 should also be in buffer - const count4 = await eventBuffer.getBufferSize(); - expect(count4).toBe(count1 + 2); - - // view3 should now be the "last" - const last3 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last3!.createdAt.toISOString()).toBe(view3.created_at); + expect(count3).toBe(count1 + 2); }); - it('adds session_end - moves last screen_view and session_end to buffer', async () => { + it('adds session_end directly to buffer queue', async () => { const t0 = Date.now(); const sessionId = 'session_2'; @@ -176,134 +99,36 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 5000).toISOString(), } as any; - // Add screen_view const count1 = await eventBuffer.getBufferSize(); + eventBuffer.add(view); - await eventBuffer.flush(); - - // Should be stored as "last", not in buffer yet - const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1); - - // Add session_end eventBuffer.add(sessionEnd); await eventBuffer.flush(); - // Both should now be in buffer (+2) - const count3 = await eventBuffer.getBufferSize(); - expect(count3).toBe(count1 + 2); - - // Last screen_view should be cleared - const last = await eventBuffer.getLastScreenView({ - projectId: 'p2', - sessionId: sessionId, - }); - expect(last).toBeNull(); - }); - - it('session_end with no previous screen_view - only adds session_end to buffer', async () => { - const sessionId = 'session_3'; - - const sessionEnd = { - project_id: 'p3', - profile_id: 'u3', - session_id: sessionId, - name: 'session_end', - created_at: new Date().toISOString(), - } as any; - - const count1 = await eventBuffer.getBufferSize(); - eventBuffer.add(sessionEnd); - await eventBuffer.flush(); - - // Only session_end should be in buffer (+1) const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1 + 1); - }); - - it('gets last screen_view by profileId', async () => { - const view = { - project_id: 'p4', - profile_id: 'u4', - session_id: 'session_4', - name: 'screen_view', - path: '/home', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(view); - await eventBuffer.flush(); - - // Query by profileId - const result = await eventBuffer.getLastScreenView({ - projectId: 'p4', - profileId: 'u4', - }); - - expect(result).not.toBeNull(); - expect(result!.name).toBe('screen_view'); - expect(result!.path).toBe('/home'); - }); - - it('gets last screen_view by sessionId', async () => { - const sessionId = 'session_5'; - const view = { - project_id: 'p5', - profile_id: 'u5', - session_id: sessionId, - name: 'screen_view', - path: '/about', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(view); - await eventBuffer.flush(); - - // Query by sessionId - const result = await eventBuffer.getLastScreenView({ - projectId: 'p5', - sessionId: sessionId, - }); - - expect(result).not.toBeNull(); - expect(result!.name).toBe('screen_view'); - expect(result!.path).toBe('/about'); - }); - - it('returns null for non-existent last screen_view', async () => { - const result = await eventBuffer.getLastScreenView({ - projectId: 'p_nonexistent', - profileId: 'u_nonexistent', - }); - - expect(result).toBeNull(); + expect(count2).toBe(count1 + 2); }); it('gets buffer count correctly', async () => { - // Initially 0 expect(await eventBuffer.getBufferSize()).toBe(0); - // Add regular event eventBuffer.add({ project_id: 'p6', name: 'event1', created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - expect(await eventBuffer.getBufferSize()).toBe(1); - // Add another regular event eventBuffer.add({ project_id: 'p6', name: 'event2', created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - expect(await eventBuffer.getBufferSize()).toBe(2); - // Add screen_view (not counted until flushed) + // screen_view also goes directly to buffer eventBuffer.add({ project_id: 'p6', profile_id: 'u6', @@ -312,21 +137,6 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - - // Still 2 (screen_view is pending) - expect(await eventBuffer.getBufferSize()).toBe(2); - - // Add another screen_view (first one gets flushed) - eventBuffer.add({ - project_id: 'p6', - profile_id: 'u6', - session_id: 'session_6', - name: 'screen_view', - created_at: new Date(Date.now() + 1000).toISOString(), - } as any); - await eventBuffer.flush(); - - // Now 3 (2 regular + 1 flushed screen_view) expect(await eventBuffer.getBufferSize()).toBe(3); }); @@ -355,14 +165,12 @@ describe('EventBuffer', () => { await eventBuffer.processBuffer(); - // Should insert both events expect(insertSpy).toHaveBeenCalled(); const callArgs = insertSpy.mock.calls[0]![0]; expect(callArgs.format).toBe('JSONEachRow'); expect(callArgs.table).toBe('events'); expect(Array.isArray(callArgs.values)).toBe(true); - // Buffer should be empty after processing expect(await eventBuffer.getBufferSize()).toBe(0); insertSpy.mockRestore(); @@ -373,7 +181,6 @@ describe('EventBuffer', () => { process.env.EVENT_BUFFER_CHUNK_SIZE = '2'; const eb = new EventBuffer(); - // Add 4 events for (let i = 0; i < 4; i++) { eb.add({ project_id: 'p8', @@ -389,14 +196,12 @@ describe('EventBuffer', () => { await eb.processBuffer(); - // With chunk size 2 and 4 events, should be called twice expect(insertSpy).toHaveBeenCalledTimes(2); const call1Values = insertSpy.mock.calls[0]![0].values as any[]; const call2Values = insertSpy.mock.calls[1]![0].values as any[]; expect(call1Values.length).toBe(2); expect(call2Values.length).toBe(2); - // Restore if (prev === undefined) delete process.env.EVENT_BUFFER_CHUNK_SIZE; else process.env.EVENT_BUFFER_CHUNK_SIZE = prev; @@ -418,126 +223,54 @@ describe('EventBuffer', () => { expect(count).toBeGreaterThanOrEqual(1); }); - it('handles multiple sessions independently', async () => { + it('handles multiple sessions independently — all events go to buffer', async () => { const t0 = Date.now(); + const count1 = await eventBuffer.getBufferSize(); - // Session 1 - const view1a = { + eventBuffer.add({ project_id: 'p10', profile_id: 'u10', session_id: 'session_10a', name: 'screen_view', created_at: new Date(t0).toISOString(), - } as any; - - const view1b = { - project_id: 'p10', - profile_id: 'u10', - session_id: 'session_10a', - name: 'screen_view', - created_at: new Date(t0 + 1000).toISOString(), - } as any; - - // Session 2 - const view2a = { + } as any); + eventBuffer.add({ project_id: 'p10', profile_id: 'u11', session_id: 'session_10b', name: 'screen_view', created_at: new Date(t0).toISOString(), - } as any; - - const view2b = { + } as any); + eventBuffer.add({ + project_id: 'p10', + profile_id: 'u10', + session_id: 'session_10a', + name: 'screen_view', + created_at: new Date(t0 + 1000).toISOString(), + } as any); + eventBuffer.add({ project_id: 'p10', profile_id: 'u11', session_id: 'session_10b', name: 'screen_view', created_at: new Date(t0 + 2000).toISOString(), - } as any; - - eventBuffer.add(view1a); - eventBuffer.add(view2a); - eventBuffer.add(view1b); // Flushes view1a - eventBuffer.add(view2b); // Flushes view2a + } as any); await eventBuffer.flush(); - // Should have 2 events in buffer (one from each session) - expect(await eventBuffer.getBufferSize()).toBe(2); - - // Each session should have its own "last" screen_view - const last1 = await eventBuffer.getLastScreenView({ - projectId: 'p10', - sessionId: 'session_10a', - }); - expect(last1!.createdAt.toISOString()).toBe(view1b.created_at); - - const last2 = await eventBuffer.getLastScreenView({ - projectId: 'p10', - sessionId: 'session_10b', - }); - expect(last2!.createdAt.toISOString()).toBe(view2b.created_at); + // All 4 events are in buffer directly + expect(await eventBuffer.getBufferSize()).toBe(count1 + 4); }); - it('screen_view without session_id goes directly to buffer', async () => { - const view = { + it('bulk adds events to buffer', async () => { + const events = Array.from({ length: 5 }, (_, i) => ({ project_id: 'p11', - profile_id: 'u11', - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; + name: `event${i}`, + created_at: new Date(Date.now() + i).toISOString(), + })) as any[]; - const count1 = await eventBuffer.getBufferSize(); - eventBuffer.add(view); + eventBuffer.bulkAdd(events); await eventBuffer.flush(); - // Should go directly to buffer (no session_id) - const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1 + 1); - }); - - it('updates last screen_view when new one arrives from same profile but different session', async () => { - const t0 = Date.now(); - - const view1 = { - project_id: 'p12', - profile_id: 'u12', - session_id: 'session_12a', - name: 'screen_view', - path: '/page1', - created_at: new Date(t0).toISOString(), - } as any; - - const view2 = { - project_id: 'p12', - profile_id: 'u12', - session_id: 'session_12b', // Different session! - name: 'screen_view', - path: '/page2', - created_at: new Date(t0 + 1000).toISOString(), - } as any; - - eventBuffer.add(view1); - eventBuffer.add(view2); - await eventBuffer.flush(); - - // Both sessions should have their own "last" - const lastSession1 = await eventBuffer.getLastScreenView({ - projectId: 'p12', - sessionId: 'session_12a', - }); - expect(lastSession1!.path).toBe('/page1'); - - const lastSession2 = await eventBuffer.getLastScreenView({ - projectId: 'p12', - sessionId: 'session_12b', - }); - expect(lastSession2!.path).toBe('/page2'); - - // Profile should have the latest one - const lastProfile = await eventBuffer.getLastScreenView({ - projectId: 'p12', - profileId: 'u12', - }); - expect(lastProfile!.path).toBe('/page2'); + expect(await eventBuffer.getBufferSize()).toBe(5); }); }); diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 306b69d1..6b5dc8ca 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -5,32 +5,9 @@ import { publishEvent, } from '@openpanel/redis'; import { ch } from '../clickhouse/client'; -import { - type IClickhouseEvent, - type IServiceEvent, - transformEvent, -} from '../services/event.service'; +import { type IClickhouseEvent } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; -/** - * Event Buffer - * - * 1. All events go into a single list buffer (event_buffer:queue) - * 2. screen_view events are handled specially: - * - Store current screen_view as "last" for the session - * - When a new screen_view arrives, flush the previous one with calculated duration - * 3. session_end events: - * - Retrieve the last screen_view (don't modify it) - * - Push both screen_view and session_end to buffer - * 4. Flush: Process all events from the list buffer - */ -interface PendingEvent { - event: IClickhouseEvent; - eventJson: string; - eventWithTimestamp?: string; - type: 'regular' | 'screen_view' | 'session_end'; -} - export class EventBuffer extends BaseBuffer { private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) @@ -46,7 +23,7 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) : 100; - private pendingEvents: PendingEvent[] = []; + private pendingEvents: IClickhouseEvent[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; /** Tracks consecutive flush failures for observability; reset on success. */ @@ -59,100 +36,6 @@ export class EventBuffer extends BaseBuffer { private queueKey = 'event_buffer:queue'; protected bufferCounterKey = 'event_buffer:total_count'; - private scriptShas: { - addScreenView?: string; - addSessionEnd?: string; - } = {}; - - private getLastScreenViewKeyBySession(sessionId: string) { - return `event_buffer:last_screen_view:session:${sessionId}`; - } - - private getLastScreenViewKeyByProfile(projectId: string, profileId: string) { - return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`; - } - - /** - * Lua script for screen_view addition. - * Uses GETDEL for atomic get-and-delete to prevent race conditions. - * - * KEYS[1] = last screen_view key (by session) - * KEYS[2] = last screen_view key (by profile, may be empty) - * KEYS[3] = queue key - * KEYS[4] = buffer counter key - * ARGV[1] = new event with timestamp as JSON: {"event": {...}, "ts": 123456} - * ARGV[2] = TTL for last screen_view (1 hour) - */ - private readonly addScreenViewScript = ` -local sessionKey = KEYS[1] -local profileKey = KEYS[2] -local queueKey = KEYS[3] -local counterKey = KEYS[4] -local newEventData = ARGV[1] -local ttl = tonumber(ARGV[2]) - -local previousEventData = redis.call("GETDEL", sessionKey) - -redis.call("SET", sessionKey, newEventData, "EX", ttl) - -if profileKey and profileKey ~= "" then - redis.call("SET", profileKey, newEventData, "EX", ttl) -end - -if previousEventData then - local prev = cjson.decode(previousEventData) - local curr = cjson.decode(newEventData) - - if prev.ts and curr.ts then - prev.event.duration = math.max(0, curr.ts - prev.ts) - end - - redis.call("RPUSH", queueKey, cjson.encode(prev.event)) - redis.call("INCR", counterKey) - return 1 -end - -return 0 -`; - - /** - * Lua script for session_end. - * Uses GETDEL to atomically retrieve and delete the last screen_view. - * - * KEYS[1] = last screen_view key (by session) - * KEYS[2] = last screen_view key (by profile, may be empty) - * KEYS[3] = queue key - * KEYS[4] = buffer counter key - * ARGV[1] = session_end event JSON - */ - private readonly addSessionEndScript = ` -local sessionKey = KEYS[1] -local profileKey = KEYS[2] -local queueKey = KEYS[3] -local counterKey = KEYS[4] -local sessionEndJson = ARGV[1] - -local previousEventData = redis.call("GETDEL", sessionKey) -local added = 0 - -if previousEventData then - local prev = cjson.decode(previousEventData) - redis.call("RPUSH", queueKey, cjson.encode(prev.event)) - redis.call("INCR", counterKey) - added = added + 1 -end - -redis.call("RPUSH", queueKey, sessionEndJson) -redis.call("INCR", counterKey) -added = added + 1 - -if profileKey and profileKey ~= "" then - redis.call("DEL", profileKey) -end - -return added -`; - constructor() { super({ name: 'event', @@ -160,27 +43,6 @@ return added await this.processBuffer(); }, }); - this.loadScripts(); - } - - private async loadScripts() { - try { - const redis = getRedisCache(); - const [screenViewSha, sessionEndSha] = await Promise.all([ - redis.script('LOAD', this.addScreenViewScript), - redis.script('LOAD', this.addSessionEndScript), - ]); - - this.scriptShas.addScreenView = screenViewSha as string; - this.scriptShas.addSessionEnd = sessionEndSha as string; - - this.logger.info('Loaded Lua scripts into Redis', { - addScreenView: this.scriptShas.addScreenView, - addSessionEnd: this.scriptShas.addSessionEnd, - }); - } catch (error) { - this.logger.error('Failed to load Lua scripts', { error }); - } } bulkAdd(events: IClickhouseEvent[]) { @@ -190,30 +52,7 @@ return added } add(event: IClickhouseEvent) { - const eventJson = JSON.stringify(event); - - let type: PendingEvent['type'] = 'regular'; - let eventWithTimestamp: string | undefined; - - if (event.session_id && event.name === 'screen_view') { - type = 'screen_view'; - const timestamp = new Date(event.created_at || Date.now()).getTime(); - eventWithTimestamp = JSON.stringify({ - event: event, - ts: timestamp, - }); - } else if (event.session_id && event.name === 'session_end') { - type = 'session_end'; - } - - const pendingEvent: PendingEvent = { - event, - eventJson, - eventWithTimestamp, - type, - }; - - this.pendingEvents.push(pendingEvent); + this.pendingEvents.push(event); if (this.pendingEvents.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); @@ -228,57 +67,6 @@ return added } } - private addToMulti(multi: ReturnType, pending: PendingEvent) { - const { event, eventJson, eventWithTimestamp, type } = pending; - - if (type === 'screen_view' && event.session_id) { - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) - : ''; - - this.evalScript( - multi, - 'addScreenView', - this.addScreenViewScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventWithTimestamp!, - '3600', - ); - } else if (type === 'session_end' && event.session_id) { - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) - : ''; - - this.evalScript( - multi, - 'addSessionEnd', - this.addSessionEndScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventJson, - ); - } else { - multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); - } - - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id, - ); - } - } - public async flush() { if (this.flushTimer) { clearTimeout(this.flushTimer); @@ -301,9 +89,17 @@ return added const redis = getRedisCache(); const multi = redis.multi(); - for (const pending of eventsToFlush) { - this.addToMulti(multi, pending); + for (const event of eventsToFlush) { + multi.rpush(this.queueKey, JSON.stringify(event)); + if (event.profile_id) { + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); + } } + multi.incrby(this.bufferCounterKey, eventsToFlush.length); await multi.exec(); @@ -314,11 +110,14 @@ return added this.pendingEvents = eventsToFlush.concat(this.pendingEvents); this.flushRetryCount += 1; - this.logger.warn('Failed to flush local buffer to Redis; events re-queued', { - error, - eventCount: eventsToFlush.length, - flushRetryCount: this.flushRetryCount, - }); + this.logger.warn( + 'Failed to flush local buffer to Redis; events re-queued', + { + error, + eventCount: eventsToFlush.length, + flushRetryCount: this.flushRetryCount, + }, + ); } finally { this.isFlushing = false; // Events may have accumulated while we were flushing; schedule another flush if needed @@ -331,24 +130,6 @@ return added } } - private evalScript( - multi: ReturnType, - scriptName: keyof typeof this.scriptShas, - scriptContent: string, - numKeys: number, - ...args: (string | number)[] - ) { - const sha = this.scriptShas[scriptName]; - - if (sha) { - multi.evalsha(sha, numKeys, ...args); - } else { - multi.eval(scriptContent, numKeys, ...args); - this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`); - this.loadScripts(); - } - } - async processBuffer() { const redis = getRedisCache(); @@ -398,7 +179,10 @@ return added const countByProject = new Map(); for (const event of eventsToClickhouse) { - countByProject.set(event.project_id, (countByProject.get(event.project_id) ?? 0) + 1); + countByProject.set( + event.project_id, + (countByProject.get(event.project_id) ?? 0) + 1, + ); } for (const [projectId, count] of countByProject) { publishEvent('events', 'batch', { projectId, count }); @@ -419,42 +203,6 @@ return added } } - public async getLastScreenView( - params: - | { - sessionId: string; - } - | { - projectId: string; - profileId: string; - }, - ): Promise { - const redis = getRedisCache(); - - let lastScreenViewKey: string; - if ('sessionId' in params) { - lastScreenViewKey = this.getLastScreenViewKeyBySession(params.sessionId); - } else { - lastScreenViewKey = this.getLastScreenViewKeyByProfile( - params.projectId, - params.profileId, - ); - } - - const eventDataStr = await redis.get(lastScreenViewKey); - - if (eventDataStr) { - const eventData = getSafeJson<{ event: IClickhouseEvent; ts: number }>( - eventDataStr, - ); - if (eventData?.event) { - return transformEvent(eventData.event); - } - } - - return null; - } - public async getBufferSize() { return this.getBufferSizeWithCounter(async () => { const redis = getRedisCache(); diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 94df1477..7a7b4a25 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -168,7 +168,6 @@ export function transformEvent(event: IClickhouseEvent): IServiceEvent { device: event.device, brand: event.brand, model: event.model, - duration: event.duration, path: event.path, origin: event.origin, referrer: event.referrer, @@ -216,7 +215,7 @@ export interface IServiceEvent { device?: string | undefined; brand?: string | undefined; model?: string | undefined; - duration: number; + duration?: number; path: string; origin: string; referrer: string | undefined; @@ -247,7 +246,7 @@ export interface IServiceEventMinimal { browser?: string | undefined; device?: string | undefined; brand?: string | undefined; - duration: number; + duration?: number; path: string; origin: string; referrer: string | undefined; @@ -379,7 +378,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) { device: payload.device ?? '', brand: payload.brand ?? '', model: payload.model ?? '', - duration: payload.duration, + duration: payload.duration ?? 0, referrer: payload.referrer ?? '', referrer_name: payload.referrerName ?? '', referrer_type: payload.referrerType ?? '', @@ -477,7 +476,7 @@ export async function getEventList(options: GetEventListOptions) { sb.where.cursor = `created_at < ${sqlstring.escape(formatClickhouseDate(cursor))}`; } - if (!cursor && !(startDate && endDate)) { + if (!(cursor || (startDate && endDate))) { sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(new Date()))}, 3) - INTERVAL ${safeDateIntervalInDays} DAY`; } @@ -562,9 +561,6 @@ export async function getEventList(options: GetEventListOptions) { if (select.model) { sb.select.model = 'model'; } - if (select.duration) { - sb.select.duration = 'duration'; - } if (select.path) { sb.select.path = 'path'; } @@ -771,7 +767,6 @@ class EventService { where, select, limit, - orderBy, filters, }: { projectId: string; @@ -811,7 +806,6 @@ class EventService { select.event.deviceId && 'e.device_id as device_id', select.event.name && 'e.name as name', select.event.path && 'e.path as path', - select.event.duration && 'e.duration as duration', select.event.country && 'e.country as country', select.event.city && 'e.city as city', select.event.os && 'e.os as os', @@ -896,7 +890,6 @@ class EventService { select.event.deviceId && 'e.device_id as device_id', select.event.name && 'e.name as name', select.event.path && 'e.path as path', - select.event.duration && 'e.duration as duration', select.event.country && 'e.country as country', select.event.city && 'e.city as city', select.event.os && 'e.os as os', @@ -1032,7 +1025,6 @@ class EventService { id: true, name: true, createdAt: true, - duration: true, country: true, city: true, os: true, diff --git a/packages/db/src/services/overview.service.ts b/packages/db/src/services/overview.service.ts index 17b8d900..d62b83a3 100644 --- a/packages/db/src/services/overview.service.ts +++ b/packages/db/src/services/overview.service.ts @@ -416,6 +416,30 @@ export class OverviewService { const where = this.getRawWhereClause('sessions', filters); const fillConfig = this.getFillConfig(interval, startDate, endDate); + // CTE: per-event screen_view durations via window function + const rawScreenViewDurationsQuery = clix(this.client, timezone) + .select([ + `${clix.toStartOf('created_at', interval as any, timezone)} AS date`, + `dateDiff('millisecond', created_at, lead(created_at, 1, created_at) OVER (PARTITION BY session_id ORDER BY created_at)) AS duration`, + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + clix.datetime(startDate, 'toDateTime'), + clix.datetime(endDate, 'toDateTime'), + ]) + .rawWhere(this.getRawWhereClause('events', filters)); + + // CTE: avg duration per date bucket + const avgDurationByDateQuery = clix(this.client, timezone) + .select([ + 'date', + 'round(avgIf(duration, duration > 0), 2) / 1000 AS avg_session_duration', + ]) + .from('raw_screen_view_durations') + .groupBy(['date']); + // Session aggregation with bounce rates const sessionAggQuery = clix(this.client, timezone) .select([ @@ -473,6 +497,8 @@ export class OverviewService { .where('date', '!=', rollupDate) ) .with('overall_unique_visitors', overallUniqueVisitorsQuery) + .with('raw_screen_view_durations', rawScreenViewDurationsQuery) + .with('avg_duration_by_date', avgDurationByDateQuery) .select<{ date: string; bounce_rate: number; @@ -489,8 +515,7 @@ export class OverviewService { 'dss.bounce_rate as bounce_rate', 'uniq(e.profile_id) AS unique_visitors', 'uniq(e.session_id) AS total_sessions', - 'round(avgIf(duration, duration > 0), 2) / 1000 AS _avg_session_duration', - 'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration', + 'coalesce(dur.avg_session_duration, 0) AS avg_session_duration', 'count(*) AS total_screen_views', 'round((count(*) * 1.) / uniq(e.session_id), 2) AS views_per_session', '(SELECT unique_visitors FROM overall_unique_visitors) AS overall_unique_visitors', @@ -502,6 +527,10 @@ export class OverviewService { 'daily_session_stats AS dss', `${clix.toStartOf('e.created_at', interval as any)} = dss.date` ) + .leftJoin( + 'avg_duration_by_date AS dur', + `${clix.toStartOf('e.created_at', interval as any)} = dur.date` + ) .where('e.project_id', '=', projectId) .where('e.name', '=', 'screen_view') .where('e.created_at', 'BETWEEN', [ @@ -509,7 +538,7 @@ export class OverviewService { clix.datetime(endDate, 'toDateTime'), ]) .rawWhere(this.getRawWhereClause('events', filters)) - .groupBy(['date', 'dss.bounce_rate']) + .groupBy(['date', 'dss.bounce_rate', 'dur.avg_session_duration']) .orderBy('date', 'ASC') .fill(fillConfig.from, fillConfig.to, fillConfig.step) .transform({ diff --git a/packages/db/src/services/pages.service.ts b/packages/db/src/services/pages.service.ts index b014c416..e3bf5431 100644 --- a/packages/db/src/services/pages.service.ts +++ b/packages/db/src/services/pages.service.ts @@ -52,6 +52,24 @@ export class PagesService { .where('created_at', '>=', clix.exp('now() - INTERVAL 30 DAY')) .groupBy(['origin', 'path']); + // CTE: compute screen_view durations via window function (leadInFrame gives next event's timestamp) + const screenViewDurationsCte = clix(this.client, timezone) + .select([ + 'project_id', + 'session_id', + 'path', + 'origin', + `dateDiff('millisecond', created_at, lead(created_at, 1, created_at) OVER (PARTITION BY session_id ORDER BY created_at)) AS duration`, + ]) + .from(TABLE_NAMES.events, false) + .where('project_id', '=', projectId) + .where('name', '=', 'screen_view') + .where('path', '!=', '') + .where('created_at', 'BETWEEN', [ + clix.datetime(startDate, 'toDateTime'), + clix.datetime(endDate, 'toDateTime'), + ]); + // Pre-filtered sessions subquery for better performance const sessionsSubquery = clix(this.client, timezone) .select(['id', 'project_id', 'is_bounce']) @@ -66,6 +84,7 @@ export class PagesService { // Main query: aggregate events and calculate bounce rate from pre-filtered sessions const query = clix(this.client, timezone) .with('page_titles', titlesCte) + .with('screen_view_durations', screenViewDurationsCte) .select([ 'e.origin as origin', 'e.path as path', @@ -74,25 +93,18 @@ export class PagesService { 'count() as pageviews', 'round(avg(e.duration) / 1000 / 60, 2) as avg_duration', `round( - (uniqIf(e.session_id, s.is_bounce = 1) * 100.0) / - nullIf(uniq(e.session_id), 0), + (uniqIf(e.session_id, s.is_bounce = 1) * 100.0) / + nullIf(uniq(e.session_id), 0), 2 ) as bounce_rate`, ]) - .from(`${TABLE_NAMES.events} e`, false) + .from('screen_view_durations e', false) .leftJoin( sessionsSubquery, 'e.session_id = s.id AND e.project_id = s.project_id', 's' ) .leftJoin('page_titles pt', 'concat(e.origin, e.path) = pt.page_key') - .where('e.project_id', '=', projectId) - .where('e.name', '=', 'screen_view') - .where('e.path', '!=', '') - .where('e.created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) .when(!!search, (q) => { const term = `%${search}%`; q.whereGroup() diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index 59367ba4..cd6b07e1 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -1,11 +1,7 @@ -import { flatten, map, pipe, prop, range, sort, uniq } from 'ramda'; -import sqlstring from 'sqlstring'; -import { z } from 'zod'; - +import { round } from '@openpanel/common'; import { - type IClickhouseProfile, - type IServiceProfile, - TABLE_NAMES, + AggregateChartEngine, + ChartEngine, ch, chQuery, clix, @@ -21,8 +17,11 @@ import { getReportById, getSelectPropertyKey, getSettingsForProject, + type IClickhouseProfile, + type IServiceProfile, onlyReportEvents, sankeyService, + TABLE_NAMES, validateShareAccess, } from '@openpanel/db'; import { @@ -33,15 +32,15 @@ import { zReportInput, zTimeInterval, } from '@openpanel/validation'; - -import { round } from '@openpanel/common'; -import { AggregateChartEngine, ChartEngine } from '@openpanel/db'; import { differenceInDays, differenceInMonths, differenceInWeeks, formatISO, } from 'date-fns'; +import { flatten, map, pipe, prop, range, sort, uniq } from 'ramda'; +import sqlstring from 'sqlstring'; +import { z } from 'zod'; import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { @@ -83,7 +82,7 @@ const chartProcedure = publicProcedure.use( session: ctx.session?.userId ? { userId: ctx.session.userId } : undefined, - }, + } ); if (!shareValidation.isValid) { throw TRPCAccessError('You do not have access to this share'); @@ -119,7 +118,7 @@ const chartProcedure = publicProcedure.use( report: null, }, }); - }, + } ); export const chartRouter = createTRPCRouter({ @@ -128,7 +127,7 @@ export const chartRouter = createTRPCRouter({ .input( z.object({ projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId } }) => { const { timezone } = await getSettingsForProject(projectId); @@ -151,7 +150,7 @@ export const chartRouter = createTRPCRouter({ TO toStartOfDay(now()) STEP INTERVAL 1 day SETTINGS session_timezone = '${timezone}' - `, + ` ); const metricsPromise = clix(ch, timezone) @@ -185,7 +184,7 @@ export const chartRouter = createTRPCRouter({ ? Math.round( ((metrics.months_3 - metrics.months_3_prev) / metrics.months_3_prev) * - 100, + 100 ) : null; @@ -209,12 +208,12 @@ export const chartRouter = createTRPCRouter({ .input( z.object({ projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId } }) => { const [events, meta] = await Promise.all([ chQuery<{ name: string; count: number }>( - `SELECT name, count(name) as count FROM ${TABLE_NAMES.event_names_mv} WHERE project_id = ${sqlstring.escape(projectId)} GROUP BY name ORDER BY count DESC, name ASC`, + `SELECT name, count(name) as count FROM ${TABLE_NAMES.event_names_mv} WHERE project_id = ${sqlstring.escape(projectId)} GROUP BY name ORDER BY count DESC, name ASC` ), getEventMetasCached(projectId), ]); @@ -238,7 +237,7 @@ export const chartRouter = createTRPCRouter({ z.object({ event: z.string().optional(), projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId, event } }) => { const profiles = await clix(ch, 'UTC') @@ -252,8 +251,8 @@ export const chartRouter = createTRPCRouter({ const profileProperties = [ ...new Set( profiles.flatMap((p) => - Object.keys(p.properties).map((k) => `profile.properties.${k}`), - ), + Object.keys(p.properties).map((k) => `profile.properties.${k}`) + ) ), ]; @@ -283,7 +282,6 @@ export const chartRouter = createTRPCRouter({ }); const fixedProperties = [ - 'duration', 'revenue', 'has_profile', 'path', @@ -316,7 +314,7 @@ export const chartRouter = createTRPCRouter({ return pipe( sort((a, b) => a.length - b.length), - uniq, + uniq )(properties); }), @@ -326,9 +324,9 @@ export const chartRouter = createTRPCRouter({ event: z.string(), property: z.string(), projectId: z.string(), - }), + }) ) - .query(async ({ input: { event, property, projectId, ...input } }) => { + .query(async ({ input: { event, property, projectId } }) => { if (property === 'has_profile') { return { values: ['true', 'false'], @@ -378,7 +376,7 @@ export const chartRouter = createTRPCRouter({ .from(TABLE_NAMES.profiles) .where('project_id', '=', projectId), 'profile.id = profile_id', - 'profile', + 'profile' ); } @@ -389,8 +387,8 @@ export const chartRouter = createTRPCRouter({ (data: typeof events) => map(prop('values'), data), flatten, uniq, - sort((a, b) => a.length - b.length), - )(events), + sort((a, b) => a.length - b.length) + )(events) ); } @@ -406,8 +404,8 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) .query(async ({ input, ctx }) => { const chartInput = ctx.report @@ -448,8 +446,8 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) .query(async ({ input, ctx }) => { const chartInput = ctx.report @@ -536,12 +534,10 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) - .query(async ({ input, ctx }) => { - console.log('input', input); - + .query(({ input, ctx }) => { const chartInput = ctx.report ? { ...ctx.report, @@ -562,10 +558,10 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) - .query(async ({ input, ctx }) => { + .query(({ input, ctx }) => { const chartInput = ctx.report ? { ...ctx.report, @@ -593,7 +589,7 @@ export const chartRouter = createTRPCRouter({ range: zRange, shareId: z.string().optional(), id: z.string().optional(), - }), + }) ) .query(async ({ input, ctx }) => { const projectId = ctx.report?.projectId ?? input.projectId; @@ -647,7 +643,7 @@ export const chartRouter = createTRPCRouter({ startDate, endDate, }, - timezone, + timezone ); const diffInterval = { minute: () => differenceInDays(dates.endDate, dates.startDate), @@ -677,14 +673,14 @@ export const chartRouter = createTRPCRouter({ const usersSelect = range(0, diffInterval + 1) .map( (index) => - `groupUniqArrayIf(profile_id, x_after_cohort ${countCriteria} ${index}) AS interval_${index}_users`, + `groupUniqArrayIf(profile_id, x_after_cohort ${countCriteria} ${index}) AS interval_${index}_users` ) .join(',\n'); const countsSelect = range(0, diffInterval + 1) .map( (index) => - `length(interval_${index}_users) AS interval_${index}_user_count`, + `length(interval_${index}_users) AS interval_${index}_user_count` ) .join(',\n'); @@ -769,12 +765,10 @@ export const chartRouter = createTRPCRouter({ interval: zTimeInterval.default('day'), series: zChartSeries, breakdowns: z.record(z.string(), z.string()).optional(), - }), + }) ) .query(async ({ input }) => { - const { timezone } = await getSettingsForProject(input.projectId); const { projectId, date, series } = input; - const limit = 100; const serie = series[0]; if (!serie) { @@ -813,7 +807,7 @@ export const chartRouter = createTRPCRouter({ if (profileFields.length > 0) { // Extract top-level field names and select only what's needed const fieldsToSelect = uniq( - profileFields.map((f) => f.split('.')[0]), + profileFields.map((f) => f.split('.')[0]) ).join(', '); sb.joins.profiles = `LEFT ANY JOIN (SELECT id, ${fieldsToSelect} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile on profile.id = profile_id`; } @@ -836,7 +830,7 @@ export const chartRouter = createTRPCRouter({ // Fetch profile details in batches to avoid exceeding ClickHouse max_query_size const ids = profileIds.map((p) => p.profile_id).filter(Boolean); const BATCH_SIZE = 200; - const profiles = []; + const profiles: IServiceProfile[] = []; for (let i = 0; i < ids.length; i += BATCH_SIZE) { const batch = ids.slice(i, i + BATCH_SIZE); const batchProfiles = await getProfilesCached(batch, projectId); @@ -859,13 +853,13 @@ export const chartRouter = createTRPCRouter({ .optional() .default(false) .describe( - 'If true, show users who dropped off at this step. If false, show users who completed at least this step.', + 'If true, show users who dropped off at this step. If false, show users who completed at least this step.' ), funnelWindow: z.number().optional(), funnelGroup: z.string().optional(), breakdowns: z.array(z.object({ name: z.string() })).optional(), range: zRange, - }), + }) ) .query(async ({ input }) => { const { timezone } = await getSettingsForProject(input.projectId); @@ -911,15 +905,15 @@ export const chartRouter = createTRPCRouter({ // Check for profile filters and add profile join if needed const profileFilters = funnelService.getProfileFilters( - eventSeries as IChartEvent[], + eventSeries as IChartEvent[] ); if (profileFilters.length > 0) { const fieldsToSelect = uniq( - profileFilters.map((f) => f.split('.')[0]), + profileFilters.map((f) => f.split('.')[0]) ).join(', '); funnelCte.leftJoin( `(SELECT id, ${fieldsToSelect} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile`, - 'profile.id = events.profile_id', + 'profile.id = events.profile_id' ); } @@ -934,7 +928,7 @@ export const chartRouter = createTRPCRouter({ // `max(level) AS level` alias (ILLEGAL_AGGREGATION error). query.with( 'funnel', - 'SELECT profile_id, max(level) AS level FROM (SELECT * FROM session_funnel WHERE level != 0) GROUP BY profile_id', + 'SELECT profile_id, max(level) AS level FROM (SELECT * FROM session_funnel WHERE level != 0) GROUP BY profile_id' ); } else { // For session grouping: filter out level = 0 inside the CTE @@ -969,7 +963,7 @@ export const chartRouter = createTRPCRouter({ // when there are many profile IDs to pass in the IN(...) clause const ids = profileIdsResult.map((p) => p.profile_id).filter(Boolean); const BATCH_SIZE = 500; - const profiles = []; + const profiles: IServiceProfile[] = []; for (let i = 0; i < ids.length; i += BATCH_SIZE) { const batch = ids.slice(i, i + BATCH_SIZE); const batchProfiles = await getProfilesCached(batch, projectId); @@ -986,7 +980,7 @@ function processCohortData( total_first_event_count: number; [key: string]: any; }>, - diffInterval: number, + diffInterval: number ) { if (data.length === 0) { return []; @@ -995,13 +989,13 @@ function processCohortData( const processed = data.map((row) => { const sum = row.total_first_event_count; const values = range(0, diffInterval + 1).map( - (index) => (row[`interval_${index}_user_count`] || 0) as number, + (index) => (row[`interval_${index}_user_count`] || 0) as number ); return { cohort_interval: row.cohort_interval, sum, - values: values, + values, percentages: values.map((value) => (sum > 0 ? round(value / sum, 2) : 0)), }; }); @@ -1041,10 +1035,10 @@ function processCohortData( cohort_interval: 'Weighted Average', sum: round(averageData.totalSum / processed.length, 0), percentages: averageData.percentages.map(({ sum, weightedSum }) => - sum > 0 ? round(weightedSum / sum, 2) : 0, + sum > 0 ? round(weightedSum / sum, 2) : 0 ), values: averageData.values.map(({ sum, weightedSum }) => - sum > 0 ? round(weightedSum / sum, 0) : 0, + sum > 0 ? round(weightedSum / sum, 0) : 0 ), };