diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index f124631e..7fe235ab 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -13,6 +13,7 @@ import { createEvent, getEvents } from '@openpanel/db'; import { findJobByPrefix } from '@openpanel/queue'; import { eventsQueue } from '@openpanel/queue/src/queues'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue/src/queues'; +import { redis } from '@openpanel/redis'; const GLOBAL_PROPERTIES = ['__path', '__referrer']; const SESSION_TIMEOUT = 1000 * 60 * 30; @@ -89,17 +90,21 @@ export async function incomingEvent(job: Job) { return createEvent(payload); } - const [sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] = - await Promise.all([ - findJobByPrefix( - eventsQueue, - `sessionEnd:${projectId}:${currentDeviceId}:` - ), - findJobByPrefix( - eventsQueue, - `sessionEnd:${projectId}:${previousDeviceId}:` - ), - ]); + const [sessionEndKeys, eventsKeys] = await Promise.all([ + redis.keys(`bull:events:sessionEnd:${projectId}:*`), + redis.keys(`bull:events:event:${projectId}:*`), + ]); + + const sessionEndJobCurrentDeviceId = await findJobByPrefix( + eventsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${currentDeviceId}:` + ); + const sessionEndJobPreviousDeviceId = await findJobByPrefix( + eventsQueue, + sessionEndKeys, + `sessionEnd:${projectId}:${previousDeviceId}:` + ); const createSessionStart = !sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId; @@ -130,12 +135,15 @@ export async function incomingEvent(job: Job) { ); } - const [[sessionStartEvent], prevEventJob] = await Promise.all([ - getEvents( - `SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` - ), - findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`), - ]); + const prevEventJob = await findJobByPrefix( + eventsQueue, + eventsKeys, + `event:${projectId}:${deviceId}:` + ); + + const [sessionStartEvent] = await getEvents( + `SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` + ); const payload: Omit = { name: body.name, diff --git a/packages/queue/src/utils.ts b/packages/queue/src/utils.ts index 3fb6527e..33c675eb 100644 --- a/packages/queue/src/utils.ts +++ b/packages/queue/src/utils.ts @@ -2,23 +2,38 @@ import type { Queue } from 'bullmq'; export async function findJobByPrefix( queue: Queue, + keys: string[], matcher: string ) { - const delayed = await queue.getJobs('delayed'); - const filtered = delayed.filter((job) => - job?.opts?.jobId?.startsWith(matcher) - ); + const filtered = keys.filter((key) => key.includes(matcher)); const getTime = (val?: string) => { if (!val) return null; const match = val.match(/:(\d+)$/); return match?.[1] ? parseInt(match[1], 10) : null; }; + filtered.sort((a, b) => { - const aTime = getTime(a?.opts?.jobId); - const bTime = getTime(b?.opts?.jobId); + const aTime = getTime(a); + const bTime = getTime(b); if (aTime === null) return 1; if (bTime === null) return -1; return aTime - bTime; }); - return filtered[0]; + + async function getJob(index: number) { + if (index >= filtered.length) return null; + + const key = filtered[index]?.replace(/^bull:events:/, ''); + // return new Promise((resolve) => ) + if (key) { + const job = await queue.getJob(key); + if ((await job?.getState()) === 'delayed') { + return job; + } + } + + return getJob(index + 1); + } + + return getJob(0); }