get oldest delayed job
This commit is contained in:
@@ -181,7 +181,7 @@ export async function incomingEvent(job: Job<EventsQueuePayloadIncomingEvent>) {
|
|||||||
|
|
||||||
if (payload.name === 'screen_view') {
|
if (payload.name === 'screen_view') {
|
||||||
if (duration < 0) {
|
if (duration < 0) {
|
||||||
job.log(`prevEvent ${JSON.stringify(prevEvent, null, 2)}`);
|
logger.info({ prevEvent, payload }, 'Duration is negative');
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
// Skip update duration if it's wrong
|
// Skip update duration if it's wrong
|
||||||
|
|||||||
@@ -5,5 +5,20 @@ export async function findJobByPrefix<T>(
|
|||||||
matcher: string
|
matcher: string
|
||||||
) {
|
) {
|
||||||
const delayed = await queue.getJobs('delayed');
|
const delayed = await queue.getJobs('delayed');
|
||||||
return delayed.find((job) => job?.opts?.jobId?.startsWith(matcher));
|
const filtered = delayed.filter((job) =>
|
||||||
|
job?.opts?.jobId?.startsWith(matcher)
|
||||||
|
);
|
||||||
|
const getTime = (val?: string) => {
|
||||||
|
if (!val) return null;
|
||||||
|
const match = val.match(/:(\d+)$/);
|
||||||
|
return match?.[1] ? parseInt(match[1], 10) : null;
|
||||||
|
};
|
||||||
|
filtered.sort((a, b) => {
|
||||||
|
const aTime = getTime(a?.opts?.jobId);
|
||||||
|
const bTime = getTime(b?.opts?.jobId);
|
||||||
|
if (aTime === null) return 1;
|
||||||
|
if (bTime === null) return -1;
|
||||||
|
return aTime - bTime;
|
||||||
|
});
|
||||||
|
return filtered[0];
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user