diff --git a/lib/commands/obliterate-2.lua b/lib/commands/obliterate-2.lua index c6bd40e6f..5ec3ab98b 100644 --- a/lib/commands/obliterate-2.lua +++ b/lib/commands/obliterate-2.lua @@ -17,43 +17,42 @@ -- If the queue has currently active jobs then the script by default will return error, -- however this behaviour can be overrided using the `force` option. local maxCount = tonumber(ARGV[1]) -local count = 0 local baseKey = KEYS[2] local rcall = redis.call -local function getListItems(keyName) - return rcall('LRANGE', keyName, 0, -1) +local function getListItems(keyName, max) + return rcall('LRANGE', keyName, 0, max - 1) end -local function getZSetItems(keyName) - return rcall('ZRANGE', keyName, 0, -1) +local function getZSetItems(keyName, max) + return rcall('ZRANGE', keyName, 0, max - 1) end -local function getSetItems(keyName) - return rcall('SMEMBERS', keyName, 0, -1) -end - -local function removeKeys(parentKey, keys) +local function removeJobs(parentKey, keys) for i, key in ipairs(keys) do - if(count > maxCount) then - return true - end rcall("DEL", baseKey .. key) - count = count + 1 end - rcall("DEL", parentKey) - return false + maxCount = maxCount - #keys +end + +local function removeListJobs(keyName, max) + local jobs = getListItems(keyName, max) + removeJobs(keyName, jobs) + rcall("LTRIM", keyName, #jobs, -1) +end + +local function removeZSetJobs(keyName, max) + local jobs = getZSetItems(keyName, max) + removeJobs(keyName, jobs) + if(#jobs > 0) then + rcall("ZREM", keyName, unpack(jobs)) + end end local function removeLockKeys(keys) for i, key in ipairs(keys) do - if(count > maxCount) then - return true - end rcall("DEL", baseKey .. key .. ':lock') - count = count + 1 end - return false end -- 1) Check if paused, if not return with error. @@ -63,52 +62,53 @@ end -- 2) Check if there are active jobs, if there are and not "force" return error. local activeKey = baseKey .. 'active' -local activeKeys = getListItems(activeKey) -if (#activeKeys > 0) then +local activeJobs = getListItems(activeKey, maxCount) +if (#activeJobs > 0) then if(ARGV[2] == "") then return -2 -- Error, ExistsActiveJobs end end -if(removeLockKeys(activeKeys)) then - return 1 -end - -if(removeKeys(activeKey, activeKeys)) then +removeLockKeys(activeJobs) +removeJobs(activeKey, activeJobs) +rcall("LTRIM", activeKey, #activeJobs, -1) +if(maxCount <= 0) then return 1 end local waitKey = baseKey .. 'paused' -if(removeKeys(waitKey, getListItems(waitKey))) then +removeListJobs(waitKey, maxCount) +if(maxCount <= 0) then return 1 end local delayedKey = baseKey .. 'delayed' -if(removeKeys(delayedKey, getZSetItems(delayedKey))) then +removeZSetJobs(delayedKey, maxCount) +if(maxCount <= 0) then return 1 end local completedKey = baseKey .. 'completed' -if(removeKeys(completedKey, getZSetItems(completedKey))) then +removeZSetJobs(completedKey, maxCount) +if(maxCount <= 0) then return 1 end local failedKey = baseKey .. 'failed' -if(removeKeys(failedKey, getZSetItems(failedKey))) then +removeZSetJobs(failedKey, maxCount) +if(maxCount <= 0) then return 1 end -local waitKey = baseKey .. 'wait' -if(removeKeys(waitKey, getListItems(waitKey))) then +if(maxCount > 0) then + rcall("DEL", baseKey .. 'priority') + rcall("DEL", baseKey .. 'stalled-check') + rcall("DEL", baseKey .. 'stalled') + rcall("DEL", baseKey .. 'meta-paused') + rcall("DEL", baseKey .. 'meta') + rcall("DEL", baseKey .. 'id') + rcall("DEL", baseKey .. 'repeat') + return 0 +else return 1 end - -rcall("DEL", baseKey .. 'priority') -rcall("DEL", baseKey .. 'stalled-check') -rcall("DEL", baseKey .. 'stalled') -rcall("DEL", baseKey .. 'meta-paused') -rcall("DEL", baseKey .. 'meta') -rcall("DEL", baseKey .. 'id') -rcall("DEL", baseKey .. 'repeat') - -return 0 diff --git a/lib/scripts.js b/lib/scripts.js index 0b7335f56..4e85a7703 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -482,6 +482,7 @@ const scripts = { throw new Error('Cannot obliterate queue with active jobs'); } } + return result; }); } }; diff --git a/test/test_obliterate.js b/test/test_obliterate.js index 8caf20f37..ec7eeb27b 100644 --- a/test/test_obliterate.js +++ b/test/test_obliterate.js @@ -96,8 +96,8 @@ describe('Obliterate', () => { } return delay(250); }); - await job.finished(); + await queue.obliterate({ force: true }); const client = await queue.client; const keys = await client.keys(`bull:${queue.name}*`); @@ -123,4 +123,49 @@ describe('Obliterate', () => { const keys = await client.keys(`bull:${queue.name}:*`); expect(keys.length).to.be.eql(0); }); + + it('should obliterate a queue with high number of jobs in different statuses', async () => { + const arr1 = []; + for (let i = 0; i < 300; i++) { + arr1.push(queue.add({ foo: `barLoop${i}` })); + } + + const [lastCompletedJob] = (await Promise.all(arr1)).splice(-1); + + let fail = false; + queue.process(async () => { + if (fail) { + throw new Error('failed job'); + } + }); + + await lastCompletedJob.finished(); + + fail = true; + + const arr2 = []; + for (let i = 0; i < 300; i++) { + arr2.push(queue.add({ foo: `barLoop${i}` })); + } + + const [lastFailedJob] = (await Promise.all(arr2)).splice(-1); + + try { + await lastFailedJob.finished(); + expect(true).to.be.equal(false); + } catch (err) { + expect(true).to.be.equal(true); + } + + const arr3 = []; + for (let i = 0; i < 1623; i++) { + arr3.push(queue.add({ foo: `barLoop${i}` }, { delay: 10000 })); + } + await Promise.all(arr3); + + await queue.obliterate(); + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}*`); + expect(keys.length).to.be.eql(0); + }).timeout(20000); });