speed up worker

This commit is contained in:
Carl-Gerhard Lindesvärd
2024-05-13 22:43:07 +02:00
parent c4b02108ac
commit 494044a3e2
2 changed files with 47 additions and 24 deletions

View File

@@ -13,6 +13,7 @@ import { createEvent, getEvents } from '@openpanel/db';
import { findJobByPrefix } from '@openpanel/queue'; import { findJobByPrefix } from '@openpanel/queue';
import { eventsQueue } from '@openpanel/queue/src/queues'; import { eventsQueue } from '@openpanel/queue/src/queues';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue/src/queues'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue/src/queues';
import { redis } from '@openpanel/redis';
const GLOBAL_PROPERTIES = ['__path', '__referrer']; const GLOBAL_PROPERTIES = ['__path', '__referrer'];
const SESSION_TIMEOUT = 1000 * 60 * 30; const SESSION_TIMEOUT = 1000 * 60 * 30;
@@ -89,17 +90,21 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
return createEvent(payload); return createEvent(payload);
} }
const [sessionEndJobCurrentDeviceId, sessionEndJobPreviousDeviceId] = const [sessionEndKeys, eventsKeys] = await Promise.all([
await Promise.all([ redis.keys(`bull:events:sessionEnd:${projectId}:*`),
findJobByPrefix( redis.keys(`bull:events:event:${projectId}:*`),
eventsQueue, ]);
`sessionEnd:${projectId}:${currentDeviceId}:`
), const sessionEndJobCurrentDeviceId = await findJobByPrefix(
findJobByPrefix( eventsQueue,
eventsQueue, sessionEndKeys,
`sessionEnd:${projectId}:${previousDeviceId}:` `sessionEnd:${projectId}:${currentDeviceId}:`
), );
]); const sessionEndJobPreviousDeviceId = await findJobByPrefix(
eventsQueue,
sessionEndKeys,
`sessionEnd:${projectId}:${previousDeviceId}:`
);
const createSessionStart = const createSessionStart =
!sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId; !sessionEndJobCurrentDeviceId && !sessionEndJobPreviousDeviceId;
@@ -130,12 +135,15 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
); );
} }
const [[sessionStartEvent], prevEventJob] = await Promise.all([ const prevEventJob = await findJobByPrefix(
getEvents( eventsQueue,
`SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1` eventsKeys,
), `event:${projectId}:${deviceId}:`
findJobByPrefix(eventsQueue, `event:${projectId}:${deviceId}:`), );
]);
const [sessionStartEvent] = await getEvents(
`SELECT * FROM events WHERE name = 'session_start' AND device_id = ${escape(deviceId)} AND project_id = ${escape(projectId)} ORDER BY created_at DESC LIMIT 1`
);
const payload: Omit<IServiceCreateEventPayload, 'id'> = { const payload: Omit<IServiceCreateEventPayload, 'id'> = {
name: body.name, name: body.name,

View File

@@ -2,23 +2,38 @@ import type { Queue } from 'bullmq';
export async function findJobByPrefix<T>( export async function findJobByPrefix<T>(
queue: Queue<T, any, string>, queue: Queue<T, any, string>,
keys: string[],
matcher: string matcher: string
) { ) {
const delayed = await queue.getJobs('delayed'); const filtered = keys.filter((key) => key.includes(matcher));
const filtered = delayed.filter((job) =>
job?.opts?.jobId?.startsWith(matcher)
);
const getTime = (val?: string) => { const getTime = (val?: string) => {
if (!val) return null; if (!val) return null;
const match = val.match(/:(\d+)$/); const match = val.match(/:(\d+)$/);
return match?.[1] ? parseInt(match[1], 10) : null; return match?.[1] ? parseInt(match[1], 10) : null;
}; };
filtered.sort((a, b) => { filtered.sort((a, b) => {
const aTime = getTime(a?.opts?.jobId); const aTime = getTime(a);
const bTime = getTime(b?.opts?.jobId); const bTime = getTime(b);
if (aTime === null) return 1; if (aTime === null) return 1;
if (bTime === null) return -1; if (bTime === null) return -1;
return aTime - bTime; return aTime - bTime;
}); });
return filtered[0];
async function getJob(index: number) {
if (index >= filtered.length) return null;
const key = filtered[index]?.replace(/^bull:events:/, '');
// return new Promise((resolve) => )
if (key) {
const job = await queue.getJob(key);
if ((await job?.getState()) === 'delayed') {
return job;
}
}
return getJob(index + 1);
}
return getJob(0);
} }