Skip to content

Commit

Permalink
fix(flows): add meta key to queues created with flows
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 6, 2023
1 parent 76f9e8f commit 272ec69
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/commands/addDelayedJob-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ local parentData
--- @include "includes/getTargetQueueList"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/updateExistingJobsParent"
--- @include "includes/getOrSetMaxEvents"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand All @@ -73,7 +74,7 @@ end

local jobCounter = rcall("INCR", idKey)

local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000
local maxEvents = getOrSetMaxEvents(metaKey)

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand Down
3 changes: 2 additions & 1 deletion src/commands/addParentJob-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ local parentData
-- Includes
--- @include "includes/storeJob"
--- @include "includes/updateExistingJobsParent"
--- @include "includes/getOrSetMaxEvents"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand All @@ -60,7 +61,7 @@ end

local jobCounter = rcall("INCR", idKey)

local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000
local maxEvents = getOrSetMaxEvents(metaKey)

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand Down
3 changes: 2 additions & 1 deletion src/commands/addPrioritizedJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ local parentData
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
--- @include "includes/updateExistingJobsParent"
--- @include "includes/getOrSetMaxEvents"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand All @@ -71,7 +72,7 @@ end

local jobCounter = rcall("INCR", idKey)

local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000
local maxEvents = getOrSetMaxEvents(metaKey)

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand Down
6 changes: 4 additions & 2 deletions src/commands/addStandardJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ local parentData
--- @include "includes/storeJob"
--- @include "includes/updateExistingJobsParent"
--- @include "includes/getTargetQueueList"
--- @include "includes/getOrSetMaxEvents"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand All @@ -69,7 +70,8 @@ end

local jobCounter = rcall("INCR", KEYS[4])

local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents") or 10000
local metaKey = KEYS[3]
local maxEvents = getOrSetMaxEvents(metaKey)

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand All @@ -95,7 +97,7 @@ end
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)

local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
Expand Down
9 changes: 9 additions & 0 deletions src/commands/includes/getOrSetMaxEvents.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

local function getOrSetMaxEvents(metaKey)
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if not maxEvents then
maxEvents = 10000
rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
end
return maxEvents
end
46 changes: 46 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,52 @@ describe('flows', () => {
await removeAllQueueData(new IORedis(redisHost), topQueueName);
});

it('should add meta key to both parents and children', async () => {
const name = 'child-job';
const values = [
{ idx: 0, bar: 'something' },
{ idx: 1, baz: 'something' },
{ idx: 2, qux: 'something' },
];

const topQueueName = `top-queue-${v4()}`;

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'root-job',
queueName: topQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
children: [
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
children: [{ name, data: { idx: 2, foo: 'qux' }, queueName }],
},
],
},
],
});

const client = await flow.client;
const metaTop = await client.hgetall(`${prefix}:${topQueueName}:meta`);
expect(metaTop).to.have.be.deep.equal({ 'opts.maxLenEvents': '10000' });

const metaChildren = await client.hgetall(`${prefix}:${queueName}:meta`);
expect(metaChildren).to.have.be.deep.equal({
'opts.maxLenEvents': '10000',
});

await flow.close();

await removeAllQueueData(new IORedis(redisHost), topQueueName);
});

describe('when parent has delay', () => {
it('moves process to delayed after children are processed', async () => {
const name = 'child-job';
Expand Down

0 comments on commit 272ec69

Please sign in to comment.