diff --git a/dist/src/queueAdapters/bullMQ.js b/dist/src/queueAdapters/bullMQ.js index 3e5a2e61c9600459678fda0e29397faee7db9a1e..6cf94ac4384d86d99f292c8f296d4b95ab8f2d19 100644 --- a/dist/src/queueAdapters/bullMQ.js +++ b/dist/src/queueAdapters/bullMQ.js @@ -21,11 +21,31 @@ class BullMQAdapter extends base_1.BaseAdapter { addJob(name, data, options) { return this.queue.add(name, data, options); } - getJob(id) { - return this.queue.getJob(id); + getDelayedScoreName() { + return `${this.queue.opts.prefix}:${this.queue.name}:delayed`; + } + transformJob(job, actualDelay) { + job.opts.delay = actualDelay ? actualDelay - job.timestamp : job.delay + return job + } + async getJob(id) { + const client = await this.queue.client + const job = await this.queue.getJob(id); + if(job) { + const score = await client.zscore(this.getDelayedScoreName(), id) + return this.transformJob(job, score ? score / 0x1000 : null) + } + return undefined } - getJobs(jobStatuses, start, end) { - return this.queue.getJobs(jobStatuses, start, end); + async getJobs(jobStatuses, start, end) { + const jobs = await this.queue.getJobs(jobStatuses, start, end); + if(jobs.length === 0) { + return [] + } + const client = await this.queue.client + const scores = await client.zmscore(this.getDelayedScoreName(), jobs.filter(job => job && typeof job === 'object').map(job => job.id)) + const delays = jobs.map((_, i) => scores[i] !== null ? scores[i] / 0x1000 : null) + return jobs.map((job, i) => this.transformJob(job, delays[i])) } getJobCounts() { return this.queue.getJobCounts();