diff --git a/index.js b/index.js index ee0010fe..1339cd3f 100644 --- a/index.js +++ b/index.js @@ -50,10 +50,12 @@ require('./lib/rollbar') const scheduleJob = channel.publish.bind(channel, `${env.JOBS_QUEUE_NAME}-exchange`, env.JOBS_QUEUE_NAME) const worker = require('./lib/worker').bind(null, scheduleJob, channel) - const queues = { - 'registry-change': new Queue(1, Infinity), - 'stripe-event': new Queue(1, Infinity), - 'reset': new Queue(1, Infinity) + // if you need a customized queue configuration, you can add it here + // e.g. const queues = {'stripe-event': new Queue(1, 10)} + const queues = {} + function queueJob (queueId, job) { + const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity) + return q.add(() => worker(job)) } channel.consume(env.EVENTS_QUEUE_NAME, consume) channel.consume(env.JOBS_QUEUE_NAME, consume) @@ -80,10 +82,7 @@ require('./lib/rollbar') const data = JSON.parse(job.content.toString()) const jobsWithoutOwners = ['registry-change', 'stripe-event', 'schedule-stale-initial-pr-reminders', 'reset'] if (jobsWithoutOwners.includes(data.name)) { - if (queues[data.name]) { - return queues[data.name].add(() => worker(job)) - } - throw new Error(`Unknown queue name: ${data.name}`) + return queueJob(data.name, job) } let queueId = Number(data.accountId) || @@ -107,9 +106,7 @@ require('./lib/rollbar') throw e } } - - const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity) - q.add(() => worker(job)) + queueJob(queueId, job) } })() // }