diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 178f9454..be44688f 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -10,7 +10,11 @@ import { EventBuffer } from './event-buffer'; const redis = getRedisCache(); beforeEach(async () => { - await redis.flushdb(); + const keys = [ + ...await redis.keys('event*'), + ...await redis.keys('live:*'), + ]; + if (keys.length > 0) await redis.del(...keys); }); afterAll(async () => { @@ -273,4 +277,24 @@ describe('EventBuffer', () => { expect(await eventBuffer.getBufferSize()).toBe(5); }); + + it('retains events in queue when ClickHouse insert fails', async () => { + eventBuffer.add({ + project_id: 'p12', + name: 'event1', + created_at: new Date().toISOString(), + } as any); + await eventBuffer.flush(); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await eventBuffer.processBuffer(); + + // Events must still be in the queue — not lost + expect(await eventBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); }); diff --git a/packages/db/src/buffers/profile-buffer.test.ts b/packages/db/src/buffers/profile-buffer.test.ts index bc7d39d2..2dc757f9 100644 --- a/packages/db/src/buffers/profile-buffer.test.ts +++ b/packages/db/src/buffers/profile-buffer.test.ts @@ -1,6 +1,5 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { getSafeJson } from '@openpanel/json'; import type { IClickhouseProfile } from '../services/profile.service'; // Mock chQuery to avoid hitting real ClickHouse @@ -36,7 +35,11 @@ function makeProfile(overrides: Partial): IClickhouseProfile } beforeEach(async () => { - await redis.flushdb(); + const keys = [ + ...await redis.keys('profile*'), + ...await redis.keys('lock:profile'), + ]; + if (keys.length > 0) await redis.del(...keys); vi.mocked(chQuery).mockResolvedValue([]); }); @@ -63,64 +66,12 @@ describe('ProfileBuffer', () => { expect(sizeAfter).toBe(sizeBefore + 1); }); - it('merges subsequent updates via cache (sequential calls)', async () => { + it('concurrent adds: both raw profiles are queued', async () => { const identifyProfile = makeProfile({ first_name: 'John', email: 'john@example.com', groups: [], }); - - const groupProfile = makeProfile({ - first_name: '', - email: '', - groups: ['group-abc'], - }); - - // Sequential: identify first, then group - await profileBuffer.add(identifyProfile); - await profileBuffer.add(groupProfile); - - // Second add should read the cached identify profile and merge groups in - const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); - expect(cached?.first_name).toBe('John'); - expect(cached?.email).toBe('john@example.com'); - expect(cached?.groups).toContain('group-abc'); - }); - - it('race condition: concurrent identify + group calls preserve all data', async () => { - const identifyProfile = makeProfile({ - first_name: 'John', - email: 'john@example.com', - groups: [], - }); - - const groupProfile = makeProfile({ - first_name: '', - email: '', - groups: ['group-abc'], - }); - - // Both calls run concurrently — the per-profile lock serializes them so the - // second one reads the first's result from cache and merges correctly. - await Promise.all([ - profileBuffer.add(identifyProfile), - profileBuffer.add(groupProfile), - ]); - - const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); - - expect(cached?.first_name).toBe('John'); - expect(cached?.email).toBe('john@example.com'); - expect(cached?.groups).toContain('group-abc'); - }); - - it('race condition: concurrent writes produce one merged buffer entry', async () => { - const identifyProfile = makeProfile({ - first_name: 'John', - email: 'john@example.com', - groups: [], - }); - const groupProfile = makeProfile({ first_name: '', email: '', @@ -128,24 +79,126 @@ describe('ProfileBuffer', () => { }); const sizeBefore = await profileBuffer.getBufferSize(); + await Promise.all([ + profileBuffer.add(identifyProfile), + profileBuffer.add(groupProfile), + ]); + const sizeAfter = await profileBuffer.getBufferSize(); + + // Both raw profiles are queued; merge happens at flush time + expect(sizeAfter).toBe(sizeBefore + 2); + }); + + it('merges sequential updates for the same profile at flush time', async () => { + const identifyProfile = makeProfile({ + first_name: 'John', + email: 'john@example.com', + groups: [], + }); + const groupProfile = makeProfile({ + first_name: '', + email: '', + groups: ['group-abc'], + }); + + await profileBuffer.add(identifyProfile); + await profileBuffer.add(groupProfile); + await profileBuffer.processBuffer(); + + const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); + expect(cached?.first_name).toBe('John'); + expect(cached?.email).toBe('john@example.com'); + expect(cached?.groups).toContain('group-abc'); + }); + + it('merges concurrent updates for the same profile at flush time', async () => { + const identifyProfile = makeProfile({ + first_name: 'John', + email: 'john@example.com', + groups: [], + }); + const groupProfile = makeProfile({ + first_name: '', + email: '', + groups: ['group-abc'], + }); await Promise.all([ profileBuffer.add(identifyProfile), profileBuffer.add(groupProfile), ]); + await profileBuffer.processBuffer(); - const sizeAfter = await profileBuffer.getBufferSize(); + const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); + expect(cached?.first_name).toBe('John'); + expect(cached?.email).toBe('john@example.com'); + expect(cached?.groups).toContain('group-abc'); + }); - // The second add merges into the first — only 2 buffer entries total - // (one from identify, one merged update with group) - expect(sizeAfter).toBe(sizeBefore + 2); + it('uses existing ClickHouse data for cache misses when merging', async () => { + const existingInClickhouse = makeProfile({ + first_name: 'Jane', + email: 'jane@example.com', + groups: ['existing-group'], + }); + vi.mocked(chQuery).mockResolvedValue([existingInClickhouse]); - // The last entry in the buffer should have both name and group - const rawEntries = await redis.lrange('profile-buffer', 0, -1); - const entries = rawEntries.map((e) => getSafeJson(e)); - const lastEntry = entries[entries.length - 1]; + const incomingProfile = makeProfile({ + first_name: '', + email: '', + groups: ['new-group'], + }); - expect(lastEntry?.first_name).toBe('John'); - expect(lastEntry?.groups).toContain('group-abc'); + await profileBuffer.add(incomingProfile); + await profileBuffer.processBuffer(); + + const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); + expect(cached?.first_name).toBe('Jane'); + expect(cached?.email).toBe('jane@example.com'); + expect(cached?.groups).toContain('existing-group'); + expect(cached?.groups).toContain('new-group'); + }); + + it('buffer is empty after flush', async () => { + await profileBuffer.add(makeProfile({ first_name: 'John' })); + expect(await profileBuffer.getBufferSize()).toBe(1); + + await profileBuffer.processBuffer(); + + expect(await profileBuffer.getBufferSize()).toBe(0); + }); + + it('retains profiles in queue when ClickHouse insert fails', async () => { + await profileBuffer.add(makeProfile({ first_name: 'John' })); + + const { ch } = await import('../clickhouse/client'); + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await profileBuffer.processBuffer(); + + // Profiles must still be in the queue — not lost + expect(await profileBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); + + it('proceeds with insert when ClickHouse fetch fails (treats profiles as new)', async () => { + vi.mocked(chQuery).mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + const { ch } = await import('../clickhouse/client'); + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await profileBuffer.add(makeProfile({ first_name: 'John' })); + await profileBuffer.processBuffer(); + + // Insert must still have been called — no data loss even when fetch fails + expect(insertSpy).toHaveBeenCalled(); + expect(await profileBuffer.getBufferSize()).toBe(0); + + insertSpy.mockRestore(); }); }); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 6ff7c00b..0a2d9de2 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,9 +1,6 @@ import { deepMergeObjects } from '@openpanel/common'; -import { generateSecureId } from '@openpanel/common/server'; import { getSafeJson } from '@openpanel/json'; -import type { ILogger } from '@openpanel/logger'; import { getRedisCache, type Redis } from '@openpanel/redis'; -import shallowEqual from 'fast-deep-equal'; import { omit, uniq } from 'ramda'; import sqlstring from 'sqlstring'; import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; @@ -11,29 +8,24 @@ import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; export class ProfileBuffer extends BaseBuffer { - private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE + private readonly batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10) : 200; - private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE + private readonly chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) : 1000; - private ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS + private readonly ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS ? Number.parseInt(process.env.PROFILE_BUFFER_TTL_IN_SECONDS, 10) : 60 * 60; + /** Max profiles per ClickHouse IN-clause fetch to keep query size bounded */ + private readonly fetchChunkSize = process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE, 10) + : 50; private readonly redisKey = 'profile-buffer'; private readonly redisProfilePrefix = 'profile-cache:'; - private redis: Redis; - private releaseLockSha: string | null = null; - - private readonly releaseLockScript = ` - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - `; + private readonly redis: Redis; constructor() { super({ @@ -43,9 +35,6 @@ export class ProfileBuffer extends BaseBuffer { }, }); this.redis = getRedisCache(); - this.redis.script('LOAD', this.releaseLockScript).then((sha) => { - this.releaseLockSha = sha as string; - }); } private getProfileCacheKey({ @@ -58,243 +47,226 @@ export class ProfileBuffer extends BaseBuffer { return `${this.redisProfilePrefix}${projectId}:${profileId}`; } - private async withProfileLock( + public async fetchFromCache( profileId: string, - projectId: string, - fn: () => Promise - ): Promise { - const lockKey = `profile-lock:${projectId}:${profileId}`; - const lockId = generateSecureId('lock'); - const maxRetries = 20; - const retryDelayMs = 50; - - for (let i = 0; i < maxRetries; i++) { - const acquired = await this.redis.set(lockKey, lockId, 'EX', 5, 'NX'); - if (acquired === 'OK') { - try { - return await fn(); - } finally { - if (this.releaseLockSha) { - await this.redis.evalsha(this.releaseLockSha, 1, lockKey, lockId); - } else { - await this.redis.eval(this.releaseLockScript, 1, lockKey, lockId); - } - } - } - await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); + projectId: string + ): Promise { + const cacheKey = this.getProfileCacheKey({ profileId, projectId }); + const cached = await this.redis.get(cacheKey); + if (!cached) { + return null; } - - this.logger.error( - 'Failed to acquire profile lock, proceeding without lock', - { - profileId, - projectId, - } - ); - return fn(); + return getSafeJson(cached); } - async alreadyExists(profile: IClickhouseProfile) { - const cacheKey = this.getProfileCacheKey({ - profileId: profile.id, - projectId: profile.project_id, - }); - return (await this.redis.exists(cacheKey)) === 1; - } - - async add(profile: IClickhouseProfile, isFromEvent = false) { - const logger = this.logger.child({ - projectId: profile.project_id, - profileId: profile.id, - }); - + async add(profile: IClickhouseProfile, _isFromEvent = false) { try { - logger.debug('Adding profile'); + const result = await this.redis + .multi() + .rpush(this.redisKey, JSON.stringify(profile)) + .incr(this.bufferCounterKey) + .llen(this.redisKey) + .exec(); - if (isFromEvent && (await this.alreadyExists(profile))) { - logger.debug('Profile already created, skipping'); + if (!result) { + this.logger.error('Failed to add profile to Redis', { profile }); return; } - await this.withProfileLock(profile.id, profile.project_id, async () => { - const existingProfile = await this.fetchProfile(profile, logger); - - // Delete any properties that are not server related if we have a non-server profile - if ( - existingProfile?.properties.device !== 'server' && - profile.properties.device === 'server' - ) { - profile.properties = omit( - [ - 'city', - 'country', - 'region', - 'longitude', - 'latitude', - 'os', - 'osVersion', - 'browser', - 'device', - 'isServer', - 'os_version', - 'browser_version', - ], - profile.properties - ); - } - - const mergedProfile: IClickhouseProfile = existingProfile - ? { - ...deepMergeObjects( - existingProfile, - omit(['created_at', 'groups'], profile) - ), - groups: uniq([ - ...(existingProfile.groups ?? []), - ...(profile.groups ?? []), - ]), - } - : profile; - - if ( - profile && - existingProfile && - shallowEqual( - omit(['created_at'], existingProfile), - omit(['created_at'], mergedProfile) - ) - ) { - this.logger.debug('Profile not changed, skipping'); - return; - } - - this.logger.debug('Merged profile will be inserted', { - mergedProfile, - existingProfile, - profile, - }); - - const cacheKey = this.getProfileCacheKey({ - profileId: profile.id, - projectId: profile.project_id, - }); - - const result = await this.redis - .multi() - .set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds) - .rpush(this.redisKey, JSON.stringify(mergedProfile)) - .incr(this.bufferCounterKey) - .llen(this.redisKey) - .exec(); - - if (!result) { - this.logger.error('Failed to add profile to Redis', { - profile, - cacheKey, - }); - return; - } - const bufferLength = (result?.[3]?.[1] as number) ?? 0; - - this.logger.debug('Current buffer length', { - bufferLength, - batchSize: this.batchSize, - }); - if (bufferLength >= this.batchSize) { - await this.tryFlush(); - } - }); + const bufferLength = (result?.[2]?.[1] as number) ?? 0; + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } } catch (error) { this.logger.error('Failed to add profile', { error, profile }); } } - private async fetchProfile( - profile: IClickhouseProfile, - logger: ILogger - ): Promise { - const existingProfile = await this.fetchFromCache( - profile.id, - profile.project_id - ); - if (existingProfile) { - logger.debug('Profile found in Redis'); - return existingProfile; + private mergeProfiles( + existing: IClickhouseProfile | null, + incoming: IClickhouseProfile + ): IClickhouseProfile { + if (!existing) { + return incoming; } - return this.fetchFromClickhouse(profile, logger); - } - - public async fetchFromCache( - profileId: string, - projectId: string - ): Promise { - const cacheKey = this.getProfileCacheKey({ - profileId, - projectId, - }); - const existingProfile = await this.redis.get(cacheKey); - if (!existingProfile) { - return null; + let profile = incoming; + if ( + existing.properties.device !== 'server' && + incoming.properties.device === 'server' + ) { + profile = { + ...incoming, + properties: omit( + [ + 'city', + 'country', + 'region', + 'longitude', + 'latitude', + 'os', + 'osVersion', + 'browser', + 'device', + 'isServer', + 'os_version', + 'browser_version', + ], + incoming.properties + ), + }; } - return getSafeJson(existingProfile); + + return { + ...deepMergeObjects(existing, omit(['created_at', 'groups'], profile)), + groups: uniq([...(existing.groups ?? []), ...(incoming.groups ?? [])]), + }; } - private async fetchFromClickhouse( - profile: IClickhouseProfile, - logger: ILogger - ): Promise { - logger.debug('Fetching profile from Clickhouse'); - const result = await chQuery( - `SELECT - id, - project_id, - last_value(nullIf(first_name, '')) as first_name, - last_value(nullIf(last_name, '')) as last_name, - last_value(nullIf(email, '')) as email, - last_value(nullIf(avatar, '')) as avatar, - last_value(is_external) as is_external, - last_value(properties) as properties, - last_value(created_at) as created_at - FROM ${TABLE_NAMES.profiles} - WHERE - id = ${sqlstring.escape(String(profile.id))} AND - project_id = ${sqlstring.escape(profile.project_id)} - ${ - profile.is_external === false - ? ' AND profiles.created_at > now() - INTERVAL 2 DAY' - : '' + private async batchFetchFromClickhouse( + profiles: IClickhouseProfile[] + ): Promise> { + const result = new Map(); + + // Non-external (anonymous/device) profiles get a 2-day recency filter to + // avoid pulling stale anonymous sessions from far back. + const external = profiles.filter((p) => p.is_external !== false); + const nonExternal = profiles.filter((p) => p.is_external === false); + + const fetchGroup = async ( + group: IClickhouseProfile[], + withDateFilter: boolean + ) => { + for (const chunk of this.chunks(group, this.fetchChunkSize)) { + const tuples = chunk + .map( + (p) => + `(${sqlstring.escape(String(p.id))}, ${sqlstring.escape(p.project_id)})` + ) + .join(', '); + try { + const rows = await chQuery( + `SELECT + id, + project_id, + last_value(nullIf(first_name, '')) as first_name, + last_value(nullIf(last_name, '')) as last_name, + last_value(nullIf(email, '')) as email, + last_value(nullIf(avatar, '')) as avatar, + last_value(is_external) as is_external, + last_value(properties) as properties, + last_value(created_at) as created_at + FROM ${TABLE_NAMES.profiles} + WHERE (id, project_id) IN (${tuples}) + ${withDateFilter ? 'AND created_at > now() - INTERVAL 2 DAY' : ''} + GROUP BY id, project_id + ORDER BY created_at DESC` + ); + for (const row of rows) { + result.set(`${row.project_id}:${row.id}`, row); + } + } catch (error) { + this.logger.warn( + 'Failed to batch fetch profiles from Clickhouse, proceeding without existing data', + { error, chunkSize: chunk.length } + ); } - GROUP BY id, project_id - ORDER BY created_at DESC - LIMIT 1` - ); - logger.debug('Clickhouse fetch result', { - found: !!result[0], - }); - return result[0] || null; + } + }; + + await Promise.all([ + fetchGroup(external, false), + fetchGroup(nonExternal, true), + ]); + + return result; } async processBuffer() { try { this.logger.debug('Starting profile buffer processing'); - const profiles = await this.redis.lrange( + const rawProfiles = await this.redis.lrange( this.redisKey, 0, this.batchSize - 1 ); - if (profiles.length === 0) { + if (rawProfiles.length === 0) { this.logger.debug('No profiles to process'); return; } - this.logger.debug(`Processing ${profiles.length} profiles in buffer`); - const parsedProfiles = profiles.map((p) => - getSafeJson(p) - ); + const parsedProfiles = rawProfiles + .map((p) => getSafeJson(p)) + .filter(Boolean) as IClickhouseProfile[]; - for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { + // Merge within batch: collapse multiple updates for the same profile + const mergedInBatch = new Map(); + for (const profile of parsedProfiles) { + const key = `${profile.project_id}:${profile.id}`; + mergedInBatch.set( + key, + this.mergeProfiles(mergedInBatch.get(key) ?? null, profile) + ); + } + + const uniqueProfiles = Array.from(mergedInBatch.values()); + + // Check Redis cache for all unique profiles in a single MGET + const cacheKeys = uniqueProfiles.map((p) => + this.getProfileCacheKey({ profileId: p.id, projectId: p.project_id }) + ); + const cacheResults = await this.redis.mget(...cacheKeys); + + const existingByKey = new Map(); + const cacheMisses: IClickhouseProfile[] = []; + for (let i = 0; i < uniqueProfiles.length; i++) { + const uniqueProfile = uniqueProfiles[i]; + if (uniqueProfile) { + const key = `${uniqueProfile.project_id}:${uniqueProfile.id}`; + const cached = cacheResults[i] + ? getSafeJson(cacheResults[i]!) + : null; + if (cached) { + existingByKey.set(key, cached); + } else { + cacheMisses.push(uniqueProfile); + } + } + } + + // Fetch cache misses from ClickHouse in bounded chunks + if (cacheMisses.length > 0) { + const clickhouseResults = + await this.batchFetchFromClickhouse(cacheMisses); + for (const [key, profile] of clickhouseResults) { + existingByKey.set(key, profile); + } + } + + // Final merge: in-batch profile + existing (from cache or ClickHouse) + const toInsert: IClickhouseProfile[] = []; + const multi = this.redis.multi(); + + for (const profile of uniqueProfiles) { + const key = `${profile.project_id}:${profile.id}`; + const merged = this.mergeProfiles( + existingByKey.get(key) ?? null, + profile + ); + toInsert.push(merged); + multi.set( + this.getProfileCacheKey({ + projectId: profile.project_id, + profileId: profile.id, + }), + JSON.stringify(merged), + 'EX', + this.ttlInSeconds + ); + } + + for (const chunk of this.chunks(toInsert, this.chunkSize)) { await ch.insert({ table: TABLE_NAMES.profiles, values: chunk, @@ -302,22 +274,21 @@ export class ProfileBuffer extends BaseBuffer { }); } - // Only remove profiles after successful insert and update counter - await this.redis - .multi() - .ltrim(this.redisKey, profiles.length, -1) - .decrby(this.bufferCounterKey, profiles.length) - .exec(); + multi + .ltrim(this.redisKey, rawProfiles.length, -1) + .decrby(this.bufferCounterKey, rawProfiles.length); + await multi.exec(); this.logger.debug('Successfully completed profile processing', { - totalProfiles: profiles.length, + totalProfiles: rawProfiles.length, + uniqueProfiles: uniqueProfiles.length, }); } catch (error) { this.logger.error('Failed to process buffer', { error }); } } - async getBufferSize() { + getBufferSize() { return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } } diff --git a/packages/db/src/buffers/session-buffer.test.ts b/packages/db/src/buffers/session-buffer.test.ts new file mode 100644 index 00000000..f140ff00 --- /dev/null +++ b/packages/db/src/buffers/session-buffer.test.ts @@ -0,0 +1,122 @@ +import { getRedisCache } from '@openpanel/redis'; +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ch } from '../clickhouse/client'; + +vi.mock('../clickhouse/client', () => ({ + ch: { + insert: vi.fn().mockResolvedValue(undefined), + }, + TABLE_NAMES: { + sessions: 'sessions', + }, +})); + +import { SessionBuffer } from './session-buffer'; +import type { IClickhouseEvent } from '../services/event.service'; + +const redis = getRedisCache(); + +function makeEvent(overrides: Partial): IClickhouseEvent { + return { + id: 'event-1', + project_id: 'project-1', + profile_id: 'profile-1', + device_id: 'device-1', + session_id: 'session-1', + name: 'screen_view', + path: '/home', + origin: '', + referrer: '', + referrer_name: '', + referrer_type: '', + duration: 0, + properties: {}, + created_at: new Date().toISOString(), + groups: [], + ...overrides, + } as IClickhouseEvent; +} + +beforeEach(async () => { + const keys = [ + ...await redis.keys('session*'), + ...await redis.keys('lock:session'), + ]; + if (keys.length > 0) await redis.del(...keys); + vi.mocked(ch.insert).mockResolvedValue(undefined as any); +}); + +afterAll(async () => { + try { + await redis.quit(); + } catch {} +}); + +describe('SessionBuffer', () => { + let sessionBuffer: SessionBuffer; + + beforeEach(() => { + sessionBuffer = new SessionBuffer(); + }); + + it('adds a new session to the buffer', async () => { + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({})); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore + 1); + }); + + it('skips session_start and session_end events', async () => { + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({ name: 'session_start' })); + await sessionBuffer.add(makeEvent({ name: 'session_end' })); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore); + }); + + it('updates existing session on subsequent events', async () => { + const t0 = Date.now(); + await sessionBuffer.add(makeEvent({ created_at: new Date(t0).toISOString() })); + + // Second event updates the same session — emits old (sign=-1) + new (sign=1) + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({ created_at: new Date(t0 + 5000).toISOString() })); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore + 2); + }); + + it('processes buffer and inserts sessions into ClickHouse', async () => { + await sessionBuffer.add(makeEvent({})); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await sessionBuffer.processBuffer(); + + expect(insertSpy).toHaveBeenCalledWith( + expect.objectContaining({ table: 'sessions', format: 'JSONEachRow' }) + ); + expect(await sessionBuffer.getBufferSize()).toBe(0); + + insertSpy.mockRestore(); + }); + + it('retains sessions in queue when ClickHouse insert fails', async () => { + await sessionBuffer.add(makeEvent({})); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await sessionBuffer.processBuffer(); + + // Sessions must still be in the queue — not lost + expect(await sessionBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); +}); diff --git a/packages/redis/cachable.test.ts b/packages/redis/cachable.test.ts index d628f439..2d212a2e 100644 --- a/packages/redis/cachable.test.ts +++ b/packages/redis/cachable.test.ts @@ -8,7 +8,10 @@ describe('cachable', () => { beforeEach(async () => { redis = getRedisCache(); // Clear any existing cache data for clean tests - const keys = await redis.keys('cachable:*'); + const keys = [ + ...await redis.keys('cachable:*'), + ...await redis.keys('test-key*'), + ]; if (keys.length > 0) { await redis.del(...keys); } @@ -16,7 +19,10 @@ describe('cachable', () => { afterEach(async () => { // Clean up after each test - const keys = await redis.keys('cachable:*'); + const keys = [ + ...await redis.keys('cachable:*'), + ...await redis.keys('test-key*'), + ]; if (keys.length > 0) { await redis.del(...keys); }