This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-23 21:37:12 +01:00
parent 20665789e1
commit 8dd3966f31
5 changed files with 473 additions and 297 deletions

View File

@@ -10,7 +10,11 @@ import { EventBuffer } from './event-buffer';
const redis = getRedisCache(); const redis = getRedisCache();
beforeEach(async () => { beforeEach(async () => {
await redis.flushdb(); const keys = [
...await redis.keys('event*'),
...await redis.keys('live:*'),
];
if (keys.length > 0) await redis.del(...keys);
}); });
afterAll(async () => { afterAll(async () => {
@@ -273,4 +277,24 @@ describe('EventBuffer', () => {
expect(await eventBuffer.getBufferSize()).toBe(5); expect(await eventBuffer.getBufferSize()).toBe(5);
}); });
it('retains events in queue when ClickHouse insert fails', async () => {
eventBuffer.add({
project_id: 'p12',
name: 'event1',
created_at: new Date().toISOString(),
} as any);
await eventBuffer.flush();
const insertSpy = vi
.spyOn(ch, 'insert')
.mockRejectedValueOnce(new Error('ClickHouse unavailable'));
await eventBuffer.processBuffer();
// Events must still be in the queue — not lost
expect(await eventBuffer.getBufferSize()).toBe(1);
insertSpy.mockRestore();
});
}); });

View File

@@ -1,6 +1,5 @@
import { getRedisCache } from '@openpanel/redis'; import { getRedisCache } from '@openpanel/redis';
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { getSafeJson } from '@openpanel/json';
import type { IClickhouseProfile } from '../services/profile.service'; import type { IClickhouseProfile } from '../services/profile.service';
// Mock chQuery to avoid hitting real ClickHouse // Mock chQuery to avoid hitting real ClickHouse
@@ -36,7 +35,11 @@ function makeProfile(overrides: Partial<IClickhouseProfile>): IClickhouseProfile
} }
beforeEach(async () => { beforeEach(async () => {
await redis.flushdb(); const keys = [
...await redis.keys('profile*'),
...await redis.keys('lock:profile'),
];
if (keys.length > 0) await redis.del(...keys);
vi.mocked(chQuery).mockResolvedValue([]); vi.mocked(chQuery).mockResolvedValue([]);
}); });
@@ -63,64 +66,12 @@ describe('ProfileBuffer', () => {
expect(sizeAfter).toBe(sizeBefore + 1); expect(sizeAfter).toBe(sizeBefore + 1);
}); });
it('merges subsequent updates via cache (sequential calls)', async () => { it('concurrent adds: both raw profiles are queued', async () => {
const identifyProfile = makeProfile({ const identifyProfile = makeProfile({
first_name: 'John', first_name: 'John',
email: 'john@example.com', email: 'john@example.com',
groups: [], groups: [],
}); });
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
// Sequential: identify first, then group
await profileBuffer.add(identifyProfile);
await profileBuffer.add(groupProfile);
// Second add should read the cached identify profile and merge groups in
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
it('race condition: concurrent identify + group calls preserve all data', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
// Both calls run concurrently — the per-profile lock serializes them so the
// second one reads the first's result from cache and merges correctly.
await Promise.all([
profileBuffer.add(identifyProfile),
profileBuffer.add(groupProfile),
]);
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
it('race condition: concurrent writes produce one merged buffer entry', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({ const groupProfile = makeProfile({
first_name: '', first_name: '',
email: '', email: '',
@@ -128,24 +79,126 @@ describe('ProfileBuffer', () => {
}); });
const sizeBefore = await profileBuffer.getBufferSize(); const sizeBefore = await profileBuffer.getBufferSize();
await Promise.all([
profileBuffer.add(identifyProfile),
profileBuffer.add(groupProfile),
]);
const sizeAfter = await profileBuffer.getBufferSize();
// Both raw profiles are queued; merge happens at flush time
expect(sizeAfter).toBe(sizeBefore + 2);
});
it('merges sequential updates for the same profile at flush time', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
await profileBuffer.add(identifyProfile);
await profileBuffer.add(groupProfile);
await profileBuffer.processBuffer();
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
it('merges concurrent updates for the same profile at flush time', async () => {
const identifyProfile = makeProfile({
first_name: 'John',
email: 'john@example.com',
groups: [],
});
const groupProfile = makeProfile({
first_name: '',
email: '',
groups: ['group-abc'],
});
await Promise.all([ await Promise.all([
profileBuffer.add(identifyProfile), profileBuffer.add(identifyProfile),
profileBuffer.add(groupProfile), profileBuffer.add(groupProfile),
]); ]);
await profileBuffer.processBuffer();
const sizeAfter = await profileBuffer.getBufferSize(); const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('John');
expect(cached?.email).toBe('john@example.com');
expect(cached?.groups).toContain('group-abc');
});
// The second add merges into the first — only 2 buffer entries total it('uses existing ClickHouse data for cache misses when merging', async () => {
// (one from identify, one merged update with group) const existingInClickhouse = makeProfile({
expect(sizeAfter).toBe(sizeBefore + 2); first_name: 'Jane',
email: 'jane@example.com',
groups: ['existing-group'],
});
vi.mocked(chQuery).mockResolvedValue([existingInClickhouse]);
// The last entry in the buffer should have both name and group const incomingProfile = makeProfile({
const rawEntries = await redis.lrange('profile-buffer', 0, -1); first_name: '',
const entries = rawEntries.map((e) => getSafeJson<IClickhouseProfile>(e)); email: '',
const lastEntry = entries[entries.length - 1]; groups: ['new-group'],
});
expect(lastEntry?.first_name).toBe('John'); await profileBuffer.add(incomingProfile);
expect(lastEntry?.groups).toContain('group-abc'); await profileBuffer.processBuffer();
const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1');
expect(cached?.first_name).toBe('Jane');
expect(cached?.email).toBe('jane@example.com');
expect(cached?.groups).toContain('existing-group');
expect(cached?.groups).toContain('new-group');
});
it('buffer is empty after flush', async () => {
await profileBuffer.add(makeProfile({ first_name: 'John' }));
expect(await profileBuffer.getBufferSize()).toBe(1);
await profileBuffer.processBuffer();
expect(await profileBuffer.getBufferSize()).toBe(0);
});
it('retains profiles in queue when ClickHouse insert fails', async () => {
await profileBuffer.add(makeProfile({ first_name: 'John' }));
const { ch } = await import('../clickhouse/client');
const insertSpy = vi
.spyOn(ch, 'insert')
.mockRejectedValueOnce(new Error('ClickHouse unavailable'));
await profileBuffer.processBuffer();
// Profiles must still be in the queue — not lost
expect(await profileBuffer.getBufferSize()).toBe(1);
insertSpy.mockRestore();
});
it('proceeds with insert when ClickHouse fetch fails (treats profiles as new)', async () => {
vi.mocked(chQuery).mockRejectedValueOnce(new Error('ClickHouse unavailable'));
const { ch } = await import('../clickhouse/client');
const insertSpy = vi
.spyOn(ch, 'insert')
.mockResolvedValueOnce(undefined as any);
await profileBuffer.add(makeProfile({ first_name: 'John' }));
await profileBuffer.processBuffer();
// Insert must still have been called — no data loss even when fetch fails
expect(insertSpy).toHaveBeenCalled();
expect(await profileBuffer.getBufferSize()).toBe(0);
insertSpy.mockRestore();
}); });
}); });

View File

@@ -1,9 +1,6 @@
import { deepMergeObjects } from '@openpanel/common'; import { deepMergeObjects } from '@openpanel/common';
import { generateSecureId } from '@openpanel/common/server';
import { getSafeJson } from '@openpanel/json'; import { getSafeJson } from '@openpanel/json';
import type { ILogger } from '@openpanel/logger';
import { getRedisCache, type Redis } from '@openpanel/redis'; import { getRedisCache, type Redis } from '@openpanel/redis';
import shallowEqual from 'fast-deep-equal';
import { omit, uniq } from 'ramda'; import { omit, uniq } from 'ramda';
import sqlstring from 'sqlstring'; import sqlstring from 'sqlstring';
import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client';
@@ -11,29 +8,24 @@ import type { IClickhouseProfile } from '../services/profile.service';
import { BaseBuffer } from './base-buffer'; import { BaseBuffer } from './base-buffer';
export class ProfileBuffer extends BaseBuffer { export class ProfileBuffer extends BaseBuffer {
private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE private readonly batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10) ? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10)
: 200; : 200;
private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE private readonly chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
: 1000; : 1000;
private ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS private readonly ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS
? Number.parseInt(process.env.PROFILE_BUFFER_TTL_IN_SECONDS, 10) ? Number.parseInt(process.env.PROFILE_BUFFER_TTL_IN_SECONDS, 10)
: 60 * 60; : 60 * 60;
/** Max profiles per ClickHouse IN-clause fetch to keep query size bounded */
private readonly fetchChunkSize = process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE, 10)
: 50;
private readonly redisKey = 'profile-buffer'; private readonly redisKey = 'profile-buffer';
private readonly redisProfilePrefix = 'profile-cache:'; private readonly redisProfilePrefix = 'profile-cache:';
private redis: Redis; private readonly redis: Redis;
private releaseLockSha: string | null = null;
private readonly releaseLockScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
constructor() { constructor() {
super({ super({
@@ -43,9 +35,6 @@ export class ProfileBuffer extends BaseBuffer {
}, },
}); });
this.redis = getRedisCache(); this.redis = getRedisCache();
this.redis.script('LOAD', this.releaseLockScript).then((sha) => {
this.releaseLockSha = sha as string;
});
} }
private getProfileCacheKey({ private getProfileCacheKey({
@@ -58,243 +47,226 @@ export class ProfileBuffer extends BaseBuffer {
return `${this.redisProfilePrefix}${projectId}:${profileId}`; return `${this.redisProfilePrefix}${projectId}:${profileId}`;
} }
private async withProfileLock<T>( public async fetchFromCache(
profileId: string, profileId: string,
projectId: string, projectId: string
fn: () => Promise<T> ): Promise<IClickhouseProfile | null> {
): Promise<T> { const cacheKey = this.getProfileCacheKey({ profileId, projectId });
const lockKey = `profile-lock:${projectId}:${profileId}`; const cached = await this.redis.get(cacheKey);
const lockId = generateSecureId('lock'); if (!cached) {
const maxRetries = 20; return null;
const retryDelayMs = 50;
for (let i = 0; i < maxRetries; i++) {
const acquired = await this.redis.set(lockKey, lockId, 'EX', 5, 'NX');
if (acquired === 'OK') {
try {
return await fn();
} finally {
if (this.releaseLockSha) {
await this.redis.evalsha(this.releaseLockSha, 1, lockKey, lockId);
} else {
await this.redis.eval(this.releaseLockScript, 1, lockKey, lockId);
}
}
}
await new Promise((resolve) => setTimeout(resolve, retryDelayMs));
} }
return getSafeJson<IClickhouseProfile>(cached);
this.logger.error(
'Failed to acquire profile lock, proceeding without lock',
{
profileId,
projectId,
}
);
return fn();
} }
async alreadyExists(profile: IClickhouseProfile) { async add(profile: IClickhouseProfile, _isFromEvent = false) {
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
return (await this.redis.exists(cacheKey)) === 1;
}
async add(profile: IClickhouseProfile, isFromEvent = false) {
const logger = this.logger.child({
projectId: profile.project_id,
profileId: profile.id,
});
try { try {
logger.debug('Adding profile'); const result = await this.redis
.multi()
.rpush(this.redisKey, JSON.stringify(profile))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
.exec();
if (isFromEvent && (await this.alreadyExists(profile))) { if (!result) {
logger.debug('Profile already created, skipping'); this.logger.error('Failed to add profile to Redis', { profile });
return; return;
} }
await this.withProfileLock(profile.id, profile.project_id, async () => { const bufferLength = (result?.[2]?.[1] as number) ?? 0;
const existingProfile = await this.fetchProfile(profile, logger); if (bufferLength >= this.batchSize) {
await this.tryFlush();
// Delete any properties that are not server related if we have a non-server profile }
if (
existingProfile?.properties.device !== 'server' &&
profile.properties.device === 'server'
) {
profile.properties = omit(
[
'city',
'country',
'region',
'longitude',
'latitude',
'os',
'osVersion',
'browser',
'device',
'isServer',
'os_version',
'browser_version',
],
profile.properties
);
}
const mergedProfile: IClickhouseProfile = existingProfile
? {
...deepMergeObjects(
existingProfile,
omit(['created_at', 'groups'], profile)
),
groups: uniq([
...(existingProfile.groups ?? []),
...(profile.groups ?? []),
]),
}
: profile;
if (
profile &&
existingProfile &&
shallowEqual(
omit(['created_at'], existingProfile),
omit(['created_at'], mergedProfile)
)
) {
this.logger.debug('Profile not changed, skipping');
return;
}
this.logger.debug('Merged profile will be inserted', {
mergedProfile,
existingProfile,
profile,
});
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
});
const result = await this.redis
.multi()
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds)
.rpush(this.redisKey, JSON.stringify(mergedProfile))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
.exec();
if (!result) {
this.logger.error('Failed to add profile to Redis', {
profile,
cacheKey,
});
return;
}
const bufferLength = (result?.[3]?.[1] as number) ?? 0;
this.logger.debug('Current buffer length', {
bufferLength,
batchSize: this.batchSize,
});
if (bufferLength >= this.batchSize) {
await this.tryFlush();
}
});
} catch (error) { } catch (error) {
this.logger.error('Failed to add profile', { error, profile }); this.logger.error('Failed to add profile', { error, profile });
} }
} }
private async fetchProfile( private mergeProfiles(
profile: IClickhouseProfile, existing: IClickhouseProfile | null,
logger: ILogger incoming: IClickhouseProfile
): Promise<IClickhouseProfile | null> { ): IClickhouseProfile {
const existingProfile = await this.fetchFromCache( if (!existing) {
profile.id, return incoming;
profile.project_id
);
if (existingProfile) {
logger.debug('Profile found in Redis');
return existingProfile;
} }
return this.fetchFromClickhouse(profile, logger); let profile = incoming;
} if (
existing.properties.device !== 'server' &&
public async fetchFromCache( incoming.properties.device === 'server'
profileId: string, ) {
projectId: string profile = {
): Promise<IClickhouseProfile | null> { ...incoming,
const cacheKey = this.getProfileCacheKey({ properties: omit(
profileId, [
projectId, 'city',
}); 'country',
const existingProfile = await this.redis.get(cacheKey); 'region',
if (!existingProfile) { 'longitude',
return null; 'latitude',
'os',
'osVersion',
'browser',
'device',
'isServer',
'os_version',
'browser_version',
],
incoming.properties
),
};
} }
return getSafeJson<IClickhouseProfile>(existingProfile);
return {
...deepMergeObjects(existing, omit(['created_at', 'groups'], profile)),
groups: uniq([...(existing.groups ?? []), ...(incoming.groups ?? [])]),
};
} }
private async fetchFromClickhouse( private async batchFetchFromClickhouse(
profile: IClickhouseProfile, profiles: IClickhouseProfile[]
logger: ILogger ): Promise<Map<string, IClickhouseProfile>> {
): Promise<IClickhouseProfile | null> { const result = new Map<string, IClickhouseProfile>();
logger.debug('Fetching profile from Clickhouse');
const result = await chQuery<IClickhouseProfile>( // Non-external (anonymous/device) profiles get a 2-day recency filter to
`SELECT // avoid pulling stale anonymous sessions from far back.
id, const external = profiles.filter((p) => p.is_external !== false);
project_id, const nonExternal = profiles.filter((p) => p.is_external === false);
last_value(nullIf(first_name, '')) as first_name,
last_value(nullIf(last_name, '')) as last_name, const fetchGroup = async (
last_value(nullIf(email, '')) as email, group: IClickhouseProfile[],
last_value(nullIf(avatar, '')) as avatar, withDateFilter: boolean
last_value(is_external) as is_external, ) => {
last_value(properties) as properties, for (const chunk of this.chunks(group, this.fetchChunkSize)) {
last_value(created_at) as created_at const tuples = chunk
FROM ${TABLE_NAMES.profiles} .map(
WHERE (p) =>
id = ${sqlstring.escape(String(profile.id))} AND `(${sqlstring.escape(String(p.id))}, ${sqlstring.escape(p.project_id)})`
project_id = ${sqlstring.escape(profile.project_id)} )
${ .join(', ');
profile.is_external === false try {
? ' AND profiles.created_at > now() - INTERVAL 2 DAY' const rows = await chQuery<IClickhouseProfile>(
: '' `SELECT
id,
project_id,
last_value(nullIf(first_name, '')) as first_name,
last_value(nullIf(last_name, '')) as last_name,
last_value(nullIf(email, '')) as email,
last_value(nullIf(avatar, '')) as avatar,
last_value(is_external) as is_external,
last_value(properties) as properties,
last_value(created_at) as created_at
FROM ${TABLE_NAMES.profiles}
WHERE (id, project_id) IN (${tuples})
${withDateFilter ? 'AND created_at > now() - INTERVAL 2 DAY' : ''}
GROUP BY id, project_id
ORDER BY created_at DESC`
);
for (const row of rows) {
result.set(`${row.project_id}:${row.id}`, row);
}
} catch (error) {
this.logger.warn(
'Failed to batch fetch profiles from Clickhouse, proceeding without existing data',
{ error, chunkSize: chunk.length }
);
} }
GROUP BY id, project_id }
ORDER BY created_at DESC };
LIMIT 1`
); await Promise.all([
logger.debug('Clickhouse fetch result', { fetchGroup(external, false),
found: !!result[0], fetchGroup(nonExternal, true),
}); ]);
return result[0] || null;
return result;
} }
async processBuffer() { async processBuffer() {
try { try {
this.logger.debug('Starting profile buffer processing'); this.logger.debug('Starting profile buffer processing');
const profiles = await this.redis.lrange( const rawProfiles = await this.redis.lrange(
this.redisKey, this.redisKey,
0, 0,
this.batchSize - 1 this.batchSize - 1
); );
if (profiles.length === 0) { if (rawProfiles.length === 0) {
this.logger.debug('No profiles to process'); this.logger.debug('No profiles to process');
return; return;
} }
this.logger.debug(`Processing ${profiles.length} profiles in buffer`); const parsedProfiles = rawProfiles
const parsedProfiles = profiles.map((p) => .map((p) => getSafeJson<IClickhouseProfile>(p))
getSafeJson<IClickhouseProfile>(p) .filter(Boolean) as IClickhouseProfile[];
);
for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { // Merge within batch: collapse multiple updates for the same profile
const mergedInBatch = new Map<string, IClickhouseProfile>();
for (const profile of parsedProfiles) {
const key = `${profile.project_id}:${profile.id}`;
mergedInBatch.set(
key,
this.mergeProfiles(mergedInBatch.get(key) ?? null, profile)
);
}
const uniqueProfiles = Array.from(mergedInBatch.values());
// Check Redis cache for all unique profiles in a single MGET
const cacheKeys = uniqueProfiles.map((p) =>
this.getProfileCacheKey({ profileId: p.id, projectId: p.project_id })
);
const cacheResults = await this.redis.mget(...cacheKeys);
const existingByKey = new Map<string, IClickhouseProfile>();
const cacheMisses: IClickhouseProfile[] = [];
for (let i = 0; i < uniqueProfiles.length; i++) {
const uniqueProfile = uniqueProfiles[i];
if (uniqueProfile) {
const key = `${uniqueProfile.project_id}:${uniqueProfile.id}`;
const cached = cacheResults[i]
? getSafeJson<IClickhouseProfile>(cacheResults[i]!)
: null;
if (cached) {
existingByKey.set(key, cached);
} else {
cacheMisses.push(uniqueProfile);
}
}
}
// Fetch cache misses from ClickHouse in bounded chunks
if (cacheMisses.length > 0) {
const clickhouseResults =
await this.batchFetchFromClickhouse(cacheMisses);
for (const [key, profile] of clickhouseResults) {
existingByKey.set(key, profile);
}
}
// Final merge: in-batch profile + existing (from cache or ClickHouse)
const toInsert: IClickhouseProfile[] = [];
const multi = this.redis.multi();
for (const profile of uniqueProfiles) {
const key = `${profile.project_id}:${profile.id}`;
const merged = this.mergeProfiles(
existingByKey.get(key) ?? null,
profile
);
toInsert.push(merged);
multi.set(
this.getProfileCacheKey({
projectId: profile.project_id,
profileId: profile.id,
}),
JSON.stringify(merged),
'EX',
this.ttlInSeconds
);
}
for (const chunk of this.chunks(toInsert, this.chunkSize)) {
await ch.insert({ await ch.insert({
table: TABLE_NAMES.profiles, table: TABLE_NAMES.profiles,
values: chunk, values: chunk,
@@ -302,22 +274,21 @@ export class ProfileBuffer extends BaseBuffer {
}); });
} }
// Only remove profiles after successful insert and update counter multi
await this.redis .ltrim(this.redisKey, rawProfiles.length, -1)
.multi() .decrby(this.bufferCounterKey, rawProfiles.length);
.ltrim(this.redisKey, profiles.length, -1) await multi.exec();
.decrby(this.bufferCounterKey, profiles.length)
.exec();
this.logger.debug('Successfully completed profile processing', { this.logger.debug('Successfully completed profile processing', {
totalProfiles: profiles.length, totalProfiles: rawProfiles.length,
uniqueProfiles: uniqueProfiles.length,
}); });
} catch (error) { } catch (error) {
this.logger.error('Failed to process buffer', { error }); this.logger.error('Failed to process buffer', { error });
} }
} }
async getBufferSize() { getBufferSize() {
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
} }
} }

View File

@@ -0,0 +1,122 @@
import { getRedisCache } from '@openpanel/redis';
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { ch } from '../clickhouse/client';
vi.mock('../clickhouse/client', () => ({
ch: {
insert: vi.fn().mockResolvedValue(undefined),
},
TABLE_NAMES: {
sessions: 'sessions',
},
}));
import { SessionBuffer } from './session-buffer';
import type { IClickhouseEvent } from '../services/event.service';
const redis = getRedisCache();
function makeEvent(overrides: Partial<IClickhouseEvent>): IClickhouseEvent {
return {
id: 'event-1',
project_id: 'project-1',
profile_id: 'profile-1',
device_id: 'device-1',
session_id: 'session-1',
name: 'screen_view',
path: '/home',
origin: '',
referrer: '',
referrer_name: '',
referrer_type: '',
duration: 0,
properties: {},
created_at: new Date().toISOString(),
groups: [],
...overrides,
} as IClickhouseEvent;
}
beforeEach(async () => {
const keys = [
...await redis.keys('session*'),
...await redis.keys('lock:session'),
];
if (keys.length > 0) await redis.del(...keys);
vi.mocked(ch.insert).mockResolvedValue(undefined as any);
});
afterAll(async () => {
try {
await redis.quit();
} catch {}
});
describe('SessionBuffer', () => {
let sessionBuffer: SessionBuffer;
beforeEach(() => {
sessionBuffer = new SessionBuffer();
});
it('adds a new session to the buffer', async () => {
const sizeBefore = await sessionBuffer.getBufferSize();
await sessionBuffer.add(makeEvent({}));
const sizeAfter = await sessionBuffer.getBufferSize();
expect(sizeAfter).toBe(sizeBefore + 1);
});
it('skips session_start and session_end events', async () => {
const sizeBefore = await sessionBuffer.getBufferSize();
await sessionBuffer.add(makeEvent({ name: 'session_start' }));
await sessionBuffer.add(makeEvent({ name: 'session_end' }));
const sizeAfter = await sessionBuffer.getBufferSize();
expect(sizeAfter).toBe(sizeBefore);
});
it('updates existing session on subsequent events', async () => {
const t0 = Date.now();
await sessionBuffer.add(makeEvent({ created_at: new Date(t0).toISOString() }));
// Second event updates the same session — emits old (sign=-1) + new (sign=1)
const sizeBefore = await sessionBuffer.getBufferSize();
await sessionBuffer.add(makeEvent({ created_at: new Date(t0 + 5000).toISOString() }));
const sizeAfter = await sessionBuffer.getBufferSize();
expect(sizeAfter).toBe(sizeBefore + 2);
});
it('processes buffer and inserts sessions into ClickHouse', async () => {
await sessionBuffer.add(makeEvent({}));
const insertSpy = vi
.spyOn(ch, 'insert')
.mockResolvedValueOnce(undefined as any);
await sessionBuffer.processBuffer();
expect(insertSpy).toHaveBeenCalledWith(
expect.objectContaining({ table: 'sessions', format: 'JSONEachRow' })
);
expect(await sessionBuffer.getBufferSize()).toBe(0);
insertSpy.mockRestore();
});
it('retains sessions in queue when ClickHouse insert fails', async () => {
await sessionBuffer.add(makeEvent({}));
const insertSpy = vi
.spyOn(ch, 'insert')
.mockRejectedValueOnce(new Error('ClickHouse unavailable'));
await sessionBuffer.processBuffer();
// Sessions must still be in the queue — not lost
expect(await sessionBuffer.getBufferSize()).toBe(1);
insertSpy.mockRestore();
});
});

View File

@@ -8,7 +8,10 @@ describe('cachable', () => {
beforeEach(async () => { beforeEach(async () => {
redis = getRedisCache(); redis = getRedisCache();
// Clear any existing cache data for clean tests // Clear any existing cache data for clean tests
const keys = await redis.keys('cachable:*'); const keys = [
...await redis.keys('cachable:*'),
...await redis.keys('test-key*'),
];
if (keys.length > 0) { if (keys.length > 0) {
await redis.del(...keys); await redis.del(...keys);
} }
@@ -16,7 +19,10 @@ describe('cachable', () => {
afterEach(async () => { afterEach(async () => {
// Clean up after each test // Clean up after each test
const keys = await redis.keys('cachable:*'); const keys = [
...await redis.keys('cachable:*'),
...await redis.keys('test-key*'),
];
if (keys.length > 0) { if (keys.length > 0) {
await redis.del(...keys); await redis.del(...keys);
} }