Skip to content

Commit

Permalink
refactor(queue): remove duplicated behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
Realtin authored and rmehner committed Aug 2, 2017
1 parent 9a4a65c commit 8422936
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ require('./lib/rollbar')
const scheduleJob = channel.publish.bind(channel, `${env.JOBS_QUEUE_NAME}-exchange`, env.JOBS_QUEUE_NAME)
const worker = require('./lib/worker').bind(null, scheduleJob, channel)

const queues = {
'registry-change': new Queue(1, Infinity),
'stripe-event': new Queue(1, Infinity),
'reset': new Queue(1, Infinity)
// if you need a customized queue configuration, you can add it here
// e.g. const queues = {'stripe-event': new Queue(1, 10)}
const queues = {}
function queueJob (queueId, job) {
const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity)
return q.add(() => worker(job))
}
channel.consume(env.EVENTS_QUEUE_NAME, consume)
channel.consume(env.JOBS_QUEUE_NAME, consume)
Expand All @@ -80,10 +82,7 @@ require('./lib/rollbar')
const data = JSON.parse(job.content.toString())
const jobsWithoutOwners = ['registry-change', 'stripe-event', 'schedule-stale-initial-pr-reminders', 'reset']
if (jobsWithoutOwners.includes(data.name)) {
if (queues[data.name]) {
return queues[data.name].add(() => worker(job))
}
throw new Error(`Unknown queue name: ${data.name}`)
return queueJob(data.name, job)
}

let queueId = Number(data.accountId) ||
Expand All @@ -107,9 +106,7 @@ require('./lib/rollbar')
throw e
}
}

const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity)
q.add(() => worker(job))
queueJob(queueId, job)
}
})()
// }

0 comments on commit 8422936

Please sign in to comment.