Skip to content

Commit

Permalink
test(flow): do not remove grandparent on regular remove (#2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 16, 2024
1 parent 5938664 commit 51ef4ae
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if numRemovedElements == 1 then
if hard then
if hard then -- remove parent in same queue
if parentPrefix == baseKey then
removeParentDependencyKey(parentKey, hard, nil, baseKey)
rcall("DEL", parentKey, parentKey .. ':logs',
Expand Down
117 changes: 117 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3235,6 +3235,123 @@ describe('flows', () => {
await parentQueue.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

describe('when there is a grand parent', () => {
it('removes all children when removing a parent, but not grandparent', async () => {
const parentQueueName = `parent-queue-${v4()}`;
const grandparentQueueName = `grandparent-queue-${v4()}`;
const name = 'child-job';

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'grandparent-job',
queueName: grandparentQueueName,
data: {},
children: [
{
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{
name,
data: { idx: 0, foo: 'baz' },
queueName,
children: [
{ name, data: { idx: 0, foo: 'qux' }, queueName },
],
},
],
},
],
});

expect(await tree.job.getState()).to.be.equal('waiting-children');
expect(await tree.children![0].job.getState()).to.be.equal(
'waiting-children',
);

expect(
await tree.children![0].children![0].job.getState(),
).to.be.equal('waiting');
expect(
await tree.children![0].children![1].job.getState(),
).to.be.equal('waiting-children');

expect(
await tree.children![0].children![1].children![0].job.getState(),
).to.be.equal('waiting');

for (let i = 0; i < tree.children![0].children!.length; i++) {
const child = tree.children![0].children![i];
const childJob = await Job.fromId(queue, child.job.id);
expect(childJob.parent).to.deep.equal({
id: tree.children![0].job.id,
queueKey: `${prefix}:${parentQueueName}`,
});
}

const parentWorker = new Worker(parentQueueName, async () => {}, {
connection,
prefix,
});
const childrenWorker = new Worker(
queueName,
async () => {
await delay(10);
},
{
connection,
prefix,
},
);
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

const completing = new Promise(resolve => {
parentWorker.on('completed', resolve);
});

await completing;
await tree.children![0].job.remove();

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});
const parentJob = await Job.fromId(parentQueue, tree.job.id);
expect(parentJob).to.be.undefined;

for (let i = 0; i < tree.children![0].children!.length; i++) {
const child = tree.children![0].children![i];
const childJob = await Job.fromId(queue, child.job.id);
expect(childJob).to.be.undefined;
}

const jobs = await queue.getJobCountByTypes('completed');
expect(jobs).to.be.equal(0);

expect(
await tree.children![0].children![0].job.getState(),
).to.be.equal('unknown');
expect(
await tree.children![0].children![1].job.getState(),
).to.be.equal('unknown');
expect(await tree.children![0].job.getState()).to.be.equal('unknown');
expect(await tree.job.getState()).to.be.equal('waiting');

await flow.close();
await childrenWorker.close();
await parentWorker.close();
await parentQueue.close();
await removeAllQueueData(
new IORedis(redisHost),
grandparentQueueName,
);
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});
});

it('should not remove anything if there is a locked job in the tree', async () => {
Expand Down

0 comments on commit 51ef4ae

Please sign in to comment.