fix: simply event buffer

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-11-03 12:02:35 +01:00
parent 1285ad85a2
commit 742ee8dc1c
7 changed files with 753 additions and 1335 deletions

View File

@@ -11,8 +11,8 @@
"typecheck": "tsc --noEmit" "typecheck": "tsc --noEmit"
}, },
"dependencies": { "dependencies": {
"@bull-board/api": "6.13.1", "@bull-board/api": "6.14.0",
"@bull-board/express": "6.13.1", "@bull-board/express": "6.14.0",
"@openpanel/common": "workspace:*", "@openpanel/common": "workspace:*",
"@openpanel/db": "workspace:*", "@openpanel/db": "workspace:*",
"@openpanel/email": "workspace:*", "@openpanel/email": "workspace:*",
@@ -22,7 +22,7 @@
"@openpanel/importer": "workspace:*", "@openpanel/importer": "workspace:*",
"@openpanel/queue": "workspace:*", "@openpanel/queue": "workspace:*",
"@openpanel/redis": "workspace:*", "@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7", "bullmq": "^5.63.0",
"express": "^4.18.2", "express": "^4.18.2",
"groupmq": "1.1.0-next.5", "groupmq": "1.1.0-next.5",
"prom-client": "^15.1.3", "prom-client": "^15.1.3",

View File

@@ -1,6 +1,7 @@
import type { CronQueueType } from '@openpanel/queue'; import type { CronQueueType } from '@openpanel/queue';
import { cronQueue } from '@openpanel/queue'; import { cronQueue } from '@openpanel/queue';
import { getLock } from '@openpanel/redis';
import { logger } from './utils/logger'; import { logger } from './utils/logger';
export async function bootCron() { export async function bootCron() {
@@ -44,17 +45,26 @@ export async function bootCron() {
}); });
} }
const lock = await getLock('cron:lock', '1', 1000 * 60 * 60 * 5);
if (lock) {
logger.info('Cron lock acquired');
} else {
logger.info('Cron lock not acquired');
}
if (lock) {
logger.info('Updating cron jobs');
// TODO: Switch to getJobSchedulers
const repeatableJobs = await cronQueue.getRepeatableJobs();
for (const repeatableJob of repeatableJobs) {
cronQueue.removeRepeatableByKey(repeatableJob.key);
}
// Add repeatable jobs // Add repeatable jobs
for (const job of jobs) { for (const job of jobs) {
await cronQueue.add( await cronQueue.upsertJobScheduler(
job.name, job.type,
{
type: job.type,
payload: undefined,
},
{
jobId: job.type,
repeat:
typeof job.pattern === 'number' typeof job.pattern === 'number'
? { ? {
every: job.pattern, every: job.pattern,
@@ -62,25 +72,13 @@ export async function bootCron() {
: { : {
pattern: job.pattern, pattern: job.pattern,
}, },
{
data: {
type: job.type,
payload: undefined,
},
}, },
); );
} }
// Remove outdated repeatable jobs
const repeatableJobs = await cronQueue.getRepeatableJobs();
for (const repeatableJob of repeatableJobs) {
const match = jobs.find(
(job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key,
);
if (match) {
logger.info('Repeatable job exists', {
key: repeatableJob.key,
});
} else {
logger.info('Removing repeatable job', {
key: repeatableJob.key,
});
cronQueue.removeRepeatableByKey(repeatableJob.key);
}
} }
} }

View File

@@ -1,13 +1,13 @@
import type { Job } from 'bullmq'; import type { Job } from 'bullmq';
import { logger as baseLogger } from '@/utils/logger'; import { logger as baseLogger } from '@/utils/logger';
import { getTime } from '@openpanel/common';
import { import {
type IClickhouseSession, type IClickhouseSession,
type IServiceCreateEventPayload, type IServiceCreateEventPayload,
type IServiceEvent, type IServiceEvent,
TABLE_NAMES, TABLE_NAMES,
checkNotificationRulesForSessionEnd, checkNotificationRulesForSessionEnd,
convertClickhouseDateToJs,
createEvent, createEvent,
eventBuffer, eventBuffer,
formatClickhouseDate, formatClickhouseDate,
@@ -77,7 +77,7 @@ export async function createSessionEnd(
} }
try { try {
handleSessionEndNotifications({ await handleSessionEndNotifications({
session, session,
payload, payload,
}); });
@@ -103,7 +103,9 @@ export async function createSessionEnd(
name: 'session_end', name: 'session_end',
duration: session.duration ?? 0, duration: session.duration ?? 0,
path: lastScreenView?.path ?? '', path: lastScreenView?.path ?? '',
createdAt: new Date(getTime(session.ended_at) + 1000), createdAt: new Date(
convertClickhouseDateToJs(session.ended_at).getTime() + 100,
),
profileId: lastScreenView?.profileId || payload.profileId, profileId: lastScreenView?.profileId || payload.profileId,
}); });
} }

View File

@@ -61,54 +61,49 @@ export class EventBuffer extends BaseBuffer {
} }
/** /**
* Lua script for handling screen_view addition: * Lua script for handling screen_view addition - RACE-CONDITION SAFE without GroupMQ
* - Get previous screen_view from Redis (by session)
* - If exists, calculate duration and add to queue
* - Store new screen_view as "last" for both session and profile keys
* *
* KEYS[1] = last screen_view key (by session) * Strategy: Use Redis GETDEL (atomic get-and-delete) to ensure only ONE thread
* KEYS[2] = last screen_view timestamp key (by session) * can process the "last" screen_view at a time.
* KEYS[3] = last screen_view key (by profile, may be empty) *
* KEYS[4] = last screen_view timestamp key (by profile, may be empty) * KEYS[1] = last screen_view key (by session) - stores both event and timestamp as JSON
* KEYS[5] = queue key * KEYS[2] = last screen_view key (by profile, may be empty)
* KEYS[6] = buffer counter key * KEYS[3] = queue key
* ARGV[1] = new event JSON * KEYS[4] = buffer counter key
* ARGV[2] = new event timestamp (epoch ms) * ARGV[1] = new event with timestamp as JSON: {"event": {...}, "ts": 123456}
* ARGV[3] = TTL for last screen_view (1 hour) * ARGV[2] = TTL for last screen_view (1 hour)
*/ */
private readonly addScreenViewScript = ` private readonly addScreenViewScript = `
local sessionKey = KEYS[1] local sessionKey = KEYS[1]
local sessionTsKey = KEYS[2] local profileKey = KEYS[2]
local profileKey = KEYS[3] local queueKey = KEYS[3]
local profileTsKey = KEYS[4] local counterKey = KEYS[4]
local queueKey = KEYS[5] local newEventData = ARGV[1]
local counterKey = KEYS[6] local ttl = tonumber(ARGV[2])
local newEventJson = ARGV[1]
local newTimestamp = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
-- Get previous event and its timestamp from session key -- GETDEL is atomic: get previous and delete in one operation
local previousEventJson = redis.call("GET", sessionKey) -- This ensures only ONE thread gets the previous event
local previousTimestamp = redis.call("GET", sessionTsKey) local previousEventData = redis.call("GETDEL", sessionKey)
-- Store new screen_view as last for session -- Store new screen_view as last for session
redis.call("SET", sessionKey, newEventJson, "EX", ttl) redis.call("SET", sessionKey, newEventData, "EX", ttl)
redis.call("SET", sessionTsKey, newTimestamp, "EX", ttl)
-- Store new screen_view as last for profile (if key provided) -- Store new screen_view as last for profile (if key provided)
if profileKey and profileKey ~= "" then if profileKey and profileKey ~= "" then
redis.call("SET", profileKey, newEventJson, "EX", ttl) redis.call("SET", profileKey, newEventData, "EX", ttl)
redis.call("SET", profileTsKey, newTimestamp, "EX", ttl)
end end
-- If there was a previous screen_view, add it to queue with calculated duration -- If there was a previous screen_view, add it to queue with calculated duration
if previousEventJson and previousTimestamp then if previousEventData then
local previousEvent = cjson.decode(previousEventJson) local prev = cjson.decode(previousEventData)
local prevTs = tonumber(previousTimestamp) local curr = cjson.decode(newEventData)
if prevTs then
previousEvent.duration = newTimestamp - prevTs -- Calculate duration (ensure non-negative to handle clock skew)
if prev.ts and curr.ts then
prev.event.duration = math.max(0, curr.ts - prev.ts)
end end
redis.call("RPUSH", queueKey, cjson.encode(previousEvent))
redis.call("RPUSH", queueKey, cjson.encode(prev.event))
redis.call("INCR", counterKey) redis.call("INCR", counterKey)
return 1 return 1
end end
@@ -117,36 +112,31 @@ return 0
`; `;
/** /**
* Lua script for handling session_end: * Lua script for handling session_end - RACE-CONDITION SAFE
* - Get the last screen_view (by session) *
* - Add screen_view to queue (if exists) * Uses GETDEL to atomically retrieve and delete the last screen_view
* - Add session_end to queue
* - Delete last screen_view keys (both session and profile)
* *
* KEYS[1] = last screen_view key (by session) * KEYS[1] = last screen_view key (by session)
* KEYS[2] = last screen_view timestamp key (by session) * KEYS[2] = last screen_view key (by profile, may be empty)
* KEYS[3] = last screen_view key (by profile, may be empty) * KEYS[3] = queue key
* KEYS[4] = last screen_view timestamp key (by profile, may be empty) * KEYS[4] = buffer counter key
* KEYS[5] = queue key
* KEYS[6] = buffer counter key
* ARGV[1] = session_end event JSON * ARGV[1] = session_end event JSON
*/ */
private readonly addSessionEndScript = ` private readonly addSessionEndScript = `
local sessionKey = KEYS[1] local sessionKey = KEYS[1]
local sessionTsKey = KEYS[2] local profileKey = KEYS[2]
local profileKey = KEYS[3] local queueKey = KEYS[3]
local profileTsKey = KEYS[4] local counterKey = KEYS[4]
local queueKey = KEYS[5]
local counterKey = KEYS[6]
local sessionEndJson = ARGV[1] local sessionEndJson = ARGV[1]
-- Get previous event from session key -- GETDEL is atomic: only ONE thread gets the last screen_view
local previousEventJson = redis.call("GET", sessionKey) local previousEventData = redis.call("GETDEL", sessionKey)
local added = 0 local added = 0
-- If there was a previous screen_view, add it to queue -- If there was a previous screen_view, add it to queue
if previousEventJson then if previousEventData then
redis.call("RPUSH", queueKey, previousEventJson) local prev = cjson.decode(previousEventData)
redis.call("RPUSH", queueKey, cjson.encode(prev.event))
redis.call("INCR", counterKey) redis.call("INCR", counterKey)
added = added + 1 added = added + 1
end end
@@ -156,10 +146,9 @@ redis.call("RPUSH", queueKey, sessionEndJson)
redis.call("INCR", counterKey) redis.call("INCR", counterKey)
added = added + 1 added = added + 1
-- Delete last screen_view from both keys (event and timestamp) -- Delete profile key
redis.call("DEL", sessionKey, sessionTsKey)
if profileKey and profileKey ~= "" then if profileKey and profileKey ~= "" then
redis.call("DEL", profileKey, profileTsKey) redis.call("DEL", profileKey)
end end
return added return added
@@ -226,52 +215,49 @@ return added
if (event.session_id && event.name === 'screen_view') { if (event.session_id && event.name === 'screen_view') {
// Handle screen_view // Handle screen_view
const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
const sessionTsKey = `${sessionKey}:ts`;
const profileKey = event.profile_id const profileKey = event.profile_id
? this.getLastScreenViewKeyByProfile( ? this.getLastScreenViewKeyByProfile(
event.project_id, event.project_id,
event.profile_id, event.profile_id,
) )
: ''; : '';
const profileTsKey = profileKey ? `${profileKey}:ts` : '';
const timestamp = new Date(event.created_at || Date.now()).getTime(); const timestamp = new Date(event.created_at || Date.now()).getTime();
// Combine event and timestamp into single JSON for atomic operations
const eventWithTimestamp = JSON.stringify({
event: event,
ts: timestamp,
});
this.evalScript( this.evalScript(
multi, multi,
'addScreenView', 'addScreenView',
this.addScreenViewScript, this.addScreenViewScript,
6, 4,
sessionKey, sessionKey,
sessionTsKey,
profileKey, profileKey,
profileTsKey,
this.queueKey, this.queueKey,
this.bufferCounterKey, this.bufferCounterKey,
eventJson, eventWithTimestamp,
timestamp.toString(),
'3600', // 1 hour TTL '3600', // 1 hour TTL
); );
} else if (event.session_id && event.name === 'session_end') { } else if (event.session_id && event.name === 'session_end') {
// Handle session_end // Handle session_end
const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
const sessionTsKey = `${sessionKey}:ts`;
const profileKey = event.profile_id const profileKey = event.profile_id
? this.getLastScreenViewKeyByProfile( ? this.getLastScreenViewKeyByProfile(
event.project_id, event.project_id,
event.profile_id, event.profile_id,
) )
: ''; : '';
const profileTsKey = profileKey ? `${profileKey}:ts` : '';
this.evalScript( this.evalScript(
multi, multi,
'addSessionEnd', 'addSessionEnd',
this.addSessionEndScript, this.addSessionEndScript,
6, 4,
sessionKey, sessionKey,
sessionTsKey,
profileKey, profileKey,
profileTsKey,
this.queueKey, this.queueKey,
this.bufferCounterKey, this.bufferCounterKey,
eventJson, eventJson,
@@ -433,12 +419,14 @@ return added
); );
} }
const eventStr = await redis.get(lastScreenViewKey); const eventDataStr = await redis.get(lastScreenViewKey);
if (eventStr) { if (eventDataStr) {
const parsed = getSafeJson<IClickhouseEvent>(eventStr); const eventData = getSafeJson<{ event: IClickhouseEvent; ts: number }>(
if (parsed) { eventDataStr,
return transformEvent(parsed); );
if (eventData?.event) {
return transformEvent(eventData.event);
} }
} }

View File

@@ -10,7 +10,7 @@
"@openpanel/db": "workspace:*", "@openpanel/db": "workspace:*",
"@openpanel/logger": "workspace:*", "@openpanel/logger": "workspace:*",
"@openpanel/redis": "workspace:*", "@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7", "bullmq": "^5.63.0",
"groupmq": "1.1.0-next.5" "groupmq": "1.1.0-next.5"
}, },
"devDependencies": { "devDependencies": {

View File

@@ -169,19 +169,16 @@ export const getEventsGroupQueueShard = (groupId: string) => {
}; };
export const sessionsQueue = new Queue<SessionsQueuePayload>('{sessions}', { export const sessionsQueue = new Queue<SessionsQueuePayload>('{sessions}', {
// @ts-ignore
connection: getRedisQueue(), connection: getRedisQueue(),
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
}, },
}); });
export const sessionsQueueEvents = new QueueEvents('{sessions}', { export const sessionsQueueEvents = new QueueEvents('{sessions}', {
// @ts-ignore
connection: getRedisQueue(), connection: getRedisQueue(),
}); });
export const cronQueue = new Queue<CronQueuePayload>('{cron}', { export const cronQueue = new Queue<CronQueuePayload>('{cron}', {
// @ts-ignore
connection: getRedisQueue(), connection: getRedisQueue(),
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
@@ -189,7 +186,6 @@ export const cronQueue = new Queue<CronQueuePayload>('{cron}', {
}); });
export const miscQueue = new Queue<MiscQueuePayload>('{misc}', { export const miscQueue = new Queue<MiscQueuePayload>('{misc}', {
// @ts-ignore
connection: getRedisQueue(), connection: getRedisQueue(),
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
@@ -206,7 +202,6 @@ export type NotificationQueuePayload = {
export const notificationQueue = new Queue<NotificationQueuePayload>( export const notificationQueue = new Queue<NotificationQueuePayload>(
'{notification}', '{notification}',
{ {
// @ts-ignore
connection: getRedisQueue(), connection: getRedisQueue(),
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,

1867
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff