diff --git a/src/commands/addDelayedJob-7.lua b/src/commands/addDelayedJob-7.lua index 4772a2698d..9de39fc98f 100644 --- a/src/commands/addDelayedJob-7.lua +++ b/src/commands/addDelayedJob-7.lua @@ -64,6 +64,7 @@ local parentData --- @include "includes/getTargetQueueList" --- @include "includes/getNextDelayedTimestamp" --- @include "includes/updateExistingJobsParent" +--- @include "includes/getOrSetMaxEvents" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -73,7 +74,7 @@ end local jobCounter = rcall("INCR", idKey) -local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000 +local maxEvents = getOrSetMaxEvents(metaKey) local parentDependenciesKey = args[7] local timestamp = args[4] diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 45eef514b7..a2162ef46a 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -51,6 +51,7 @@ local parentData -- Includes --- @include "includes/storeJob" --- @include "includes/updateExistingJobsParent" +--- @include "includes/getOrSetMaxEvents" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -60,7 +61,7 @@ end local jobCounter = rcall("INCR", idKey) -local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000 +local maxEvents = getOrSetMaxEvents(metaKey) local parentDependenciesKey = args[7] local timestamp = args[4] diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 49997ef215..d9b1e212f2 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -62,6 +62,7 @@ local parentData --- @include "includes/addJobWithPriority" --- @include "includes/getTargetQueueList" --- @include "includes/updateExistingJobsParent" +--- @include "includes/getOrSetMaxEvents" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -71,7 +72,7 @@ end local jobCounter = rcall("INCR", idKey) -local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000 +local maxEvents = getOrSetMaxEvents(metaKey) local parentDependenciesKey = args[7] local timestamp = args[4] diff --git a/src/commands/addStandardJob-6.lua b/src/commands/addStandardJob-6.lua index 1c067f496e..97a8b3e95d 100644 --- a/src/commands/addStandardJob-6.lua +++ b/src/commands/addStandardJob-6.lua @@ -60,6 +60,7 @@ local parentData --- @include "includes/storeJob" --- @include "includes/updateExistingJobsParent" --- @include "includes/getTargetQueueList" +--- @include "includes/getOrSetMaxEvents" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -69,7 +70,8 @@ end local jobCounter = rcall("INCR", KEYS[4]) -local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents") or 10000 +local metaKey = KEYS[3] +local maxEvents = getOrSetMaxEvents(metaKey) local parentDependenciesKey = args[7] local timestamp = args[4] @@ -95,7 +97,7 @@ end storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) -local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) +local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' diff --git a/src/commands/includes/getOrSetMaxEvents.lua b/src/commands/includes/getOrSetMaxEvents.lua new file mode 100644 index 0000000000..3f7d9bde3a --- /dev/null +++ b/src/commands/includes/getOrSetMaxEvents.lua @@ -0,0 +1,9 @@ + +local function getOrSetMaxEvents(metaKey) + local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") + if not maxEvents then + maxEvents = 10000 + rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents) + end + return maxEvents +end diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 742dc4e408..6ab941907e 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -2467,6 +2467,52 @@ describe('flows', () => { await removeAllQueueData(new IORedis(redisHost), topQueueName); }); + it('should add meta key to both parents and children', async () => { + const name = 'child-job'; + const values = [ + { idx: 0, bar: 'something' }, + { idx: 1, baz: 'something' }, + { idx: 2, qux: 'something' }, + ]; + + const topQueueName = `top-queue-${v4()}`; + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'root-job', + queueName: topQueueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + children: [ + { + name, + data: { idx: 1, foo: 'baz' }, + queueName, + children: [{ name, data: { idx: 2, foo: 'qux' }, queueName }], + }, + ], + }, + ], + }); + + const client = await flow.client; + const metaTop = await client.hgetall(`${prefix}:${topQueueName}:meta`); + expect(metaTop).to.have.be.deep.equal({ 'opts.maxLenEvents': '10000' }); + + const metaChildren = await client.hgetall(`${prefix}:${queueName}:meta`); + expect(metaChildren).to.have.be.deep.equal({ + 'opts.maxLenEvents': '10000', + }); + + await flow.close(); + + await removeAllQueueData(new IORedis(redisHost), topQueueName); + }); + describe('when parent has delay', () => { it('moves process to delayed after children are processed', async () => { const name = 'child-job';