Skip to content

Commit

Permalink
test: add prefix to all tests to support dragonfly
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 10, 2023
1 parent 388904e commit 2ba2fc0
Show file tree
Hide file tree
Showing 21 changed files with 783 additions and 486 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ jobs:
dragonflydb:
image: docker.dragonflydb.io/dragonflydb/dragonfly
env:
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_cluster_mode: emulated
DFLY_lock_on_hashtags: true
BULLMQ_TEST_PREFIX: '{b}'
ports:
- 6379:6379

Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ export class QueueGetters<
}

/**
* Returns the jobs that are in the "waiting" status.
* Returns the jobs that are in the "waiting-children" status.
* I.E. parent jobs that have at least one child that has not completed yet.
* @param start - zero based index from where to start returning jobs.
* @param end - zero based index where to stop returning jobs.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export function isRedisCluster(obj: unknown): obj is Cluster {
export async function removeAllQueueData(
client: RedisClient,
queueName: string,
prefix = 'bull',
prefix = process.env.BULLMQ_TEST_PREFIX || 'bull',
): Promise<void | boolean> {
if (client instanceof Cluster) {
// todo compat with cluster ?
Expand Down
21 changes: 13 additions & 8 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import { v4 } from 'uuid';
import { Queue, Worker, Job } from '../src/classes';
import { removeAllQueueData } from '../src/utils';

const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull';

describe('bulk jobs', () => {
let queue: Queue;
let queueName: string;
const connection = { host: 'localhost' };

beforeEach(async function () {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection });
queue = new Queue(queueName, { connection, prefix });
});

afterEach(async function () {
Expand All @@ -35,7 +37,7 @@ describe('bulk jobs', () => {
}
}),
);
const worker = new Worker(queueName, processor, { connection });
const worker = new Worker(queueName, processor, { connection, prefix });
await worker.waitUntilReady();

const jobs = await queue.addBulk([
Expand All @@ -56,10 +58,13 @@ describe('bulk jobs', () => {
it('should allow to pass parent option', async () => {
const name = 'test';
const parentQueueName = `parent-queue-${v4()}`;
const parentQueue = new Queue(parentQueueName, { connection });
const parentQueue = new Queue(parentQueueName, { connection, prefix });

const parentWorker = new Worker(parentQueueName, null, { connection });
const childrenWorker = new Worker(queueName, null, { connection });
const parentWorker = new Worker(parentQueueName, null, {
connection,
prefix,
});
const childrenWorker = new Worker(queueName, null, { connection, prefix });
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

Expand All @@ -71,7 +76,7 @@ describe('bulk jobs', () => {
opts: {
parent: {
id: parent.id,
queue: `bull:${parentQueueName}`,
queue: `${prefix}:${parentQueueName}`,
},
},
},
Expand All @@ -81,7 +86,7 @@ describe('bulk jobs', () => {
opts: {
parent: {
id: parent.id,
queue: `bull:${parentQueueName}`,
queue: `${prefix}:${parentQueueName}`,
},
},
},
Expand Down Expand Up @@ -120,7 +125,7 @@ describe('bulk jobs', () => {
}
}),
);
const worker = new Worker(queueName, processor, { connection });
const worker = new Worker(queueName, processor, { connection, prefix });
await worker.waitUntilReady();

const jobs = await queue.addBulk([
Expand Down
72 changes: 43 additions & 29 deletions tests/test_clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
} from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';

const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull';

describe('Cleaner', () => {
let queue: Queue;
let queueEvents: QueueEvents;
Expand All @@ -20,8 +22,8 @@ describe('Cleaner', () => {

beforeEach(async () => {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection });
queueEvents = new QueueEvents(queueName, { connection });
queue = new Queue(queueName, { connection, prefix });
queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
await queue.waitUntilReady();
});
Expand Down Expand Up @@ -54,7 +56,7 @@ describe('Cleaner', () => {
async () => {
await delay(10);
},
{ connection },
{ connection, prefix },
);
await worker.waitUntilReady();

Expand Down Expand Up @@ -92,7 +94,10 @@ describe('Cleaner', () => {
});

it('should only remove a job outside of the grace period', async () => {
const worker = new Worker(queueName, async () => {}, { connection });
const worker = new Worker(queueName, async () => {}, {
connection,
prefix,
});
await worker.waitUntilReady();

await queue.add('test', { some: 'data' });
Expand Down Expand Up @@ -133,7 +138,7 @@ describe('Cleaner', () => {
await delay(100);
throw new Error('It failed');
},
{ connection, autorun: false },
{ connection, prefix, autorun: false },
);
await worker.waitUntilReady();

Expand Down Expand Up @@ -236,7 +241,7 @@ describe('Cleaner', () => {
it('removes parent record', async () => {
const name = 'child-job';

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName,
Expand All @@ -254,7 +259,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'wait');

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);
const keys = await client.keys(`${prefix}:${queue.name}:*`);

expect(keys.length).to.be.eql(3);

Expand All @@ -277,15 +282,15 @@ describe('Cleaner', () => {
async () => {
return delay(20);
},
{ connection },
{ connection, prefix },
);
await worker.waitUntilReady();

const completing = new Promise(resolve => {
queueEvents.on('completed', after(4, resolve));
});

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName,
Expand All @@ -302,7 +307,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'completed');

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);
const keys = await client.keys(`${prefix}:${queue.name}:*`);

// Expected keys: meta, id, stalled-check and events
expect(keys.length).to.be.eql(4);
Expand All @@ -327,15 +332,15 @@ describe('Cleaner', () => {
}
return delay(10);
},
{ connection },
{ connection, prefix },
);
await worker.waitUntilReady();

const failing = new Promise(resolve => {
worker.on('failed', resolve);
});

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'parent-job',
queueName,
Expand All @@ -351,7 +356,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'completed');

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);
const keys = await client.keys(`${prefix}:${queue.name}:*`);
// Expected keys: meta, id, stalled-check, events, failed and job
expect(keys.length).to.be.eql(7);

Expand Down Expand Up @@ -384,7 +389,7 @@ describe('Cleaner', () => {
}
return delay(10);
},
{ connection },
{ connection, prefix },
);
await worker.waitUntilReady();

Expand All @@ -396,7 +401,7 @@ describe('Cleaner', () => {
worker.on('failed', resolve);
});

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'parent-job',
queueName,
Expand All @@ -413,7 +418,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'failed');

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);
const keys = await client.keys(`${prefix}:${queue.name}:*`);

// Expected keys: meta, id, stalled-check, events, failed and 2 jobs
expect(keys.length).to.be.eql(7);
Expand All @@ -432,11 +437,14 @@ describe('Cleaner', () => {
describe('when parent has pending children in different queue', async () => {
it('keeps parent in waiting-children', async () => {
const childrenQueueName = `test-${v4()}`;
const childrenQueue = new Queue(childrenQueueName, { connection });
const childrenQueue = new Queue(childrenQueueName, {
connection,
prefix,
});
await childrenQueue.waitUntilReady();
const name = 'child-job';

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName,
Expand All @@ -456,7 +464,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'wait');

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);
const keys = await client.keys(`${prefix}:${queue.name}:*`);

expect(keys.length).to.be.eql(6);

Expand Down Expand Up @@ -532,7 +540,7 @@ describe('Cleaner', () => {
}
}
},
{ connection, concurrency: 2 },
{ connection, prefix, concurrency: 2 },
);
await worker.waitUntilReady();

Expand Down Expand Up @@ -571,11 +579,14 @@ describe('Cleaner', () => {
describe('when parent has more than 1 pending children', async () => {
it('deletes each children until trying to move parent to wait', async () => {
const parentQueueName = `test-${v4()}`;
const parentQueue = new Queue(parentQueueName, { connection });
const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});
await parentQueue.waitUntilReady();
const name = 'child-job';

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
Expand All @@ -593,12 +604,12 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'wait');

const client = await queue.client;
const keys = await client.keys(`bull:${queueName}:*`);
const keys = await client.keys(`${prefix}:${queueName}:*`);

expect(keys.length).to.be.eql(3);

const eventsCount = await client.xlen(
`bull:${parentQueueName}:events`,
`${prefix}:${parentQueueName}:events`,
);

expect(eventsCount).to.be.eql(2); // added and waiting-children events
Expand All @@ -620,11 +631,14 @@ describe('Cleaner', () => {
describe('when parent has only 1 pending children', async () => {
it('moves parent to wait to try to process it', async () => {
const parentQueueName = `test-${v4()}`;
const parentQueue = new Queue(parentQueueName, { connection });
const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});
await parentQueue.waitUntilReady();
const name = 'child-job';

const flow = new FlowProducer({ connection });
const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
Expand All @@ -648,7 +662,7 @@ describe('Cleaner', () => {
await queue.clean(0, 0, 'prioritized');

const client = await queue.client;
const keys = await client.keys(`bull:${queueName}:*`);
const keys = await client.keys(`${prefix}:${queueName}:*`);

expect(keys.length).to.be.eql(5);

Expand All @@ -674,7 +688,7 @@ describe('Cleaner', () => {
async () => {
throw new Error('It failed');
},
{ connection },
{ connection, prefix },
);
await worker.waitUntilReady();

Expand All @@ -684,7 +698,7 @@ describe('Cleaner', () => {
await queue.add('test', { some: 'data' });

await delay(100);
await client.hdel(`bull:${queueName}:1`, 'timestamp');
await client.hdel(`${prefix}:${queueName}:1`, 'timestamp');
const jobs = await queue.clean(0, 0, 'failed');
expect(jobs.length).to.be.eql(2);
const failed = await queue.getFailed();
Expand Down
8 changes: 5 additions & 3 deletions tests/test_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import { v4 } from 'uuid';
import { Queue, Job, Worker, QueueBase } from '../src/classes';
import { removeAllQueueData } from '../src/utils';

const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull';

describe('connection', () => {
let queue: Queue;
let queueName: string;
const connection = { host: 'localhost' };

beforeEach(async function () {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection: { host: 'localhost' } });
queue = new Queue(queueName, { connection: { host: 'localhost' }, prefix });
});

afterEach(async function () {
Expand Down Expand Up @@ -119,7 +121,7 @@ describe('connection', () => {
};
});

const worker = new Worker(queueName, processor, { connection });
const worker = new Worker(queueName, processor, { connection, prefix });

worker.on('error', err => {
// error event has to be observed or the exception will bubble up
Expand Down Expand Up @@ -165,7 +167,7 @@ describe('connection', () => {
};
});

const worker = new Worker(queueName, processor, { connection });
const worker = new Worker(queueName, processor, { connection, prefix });

worker.on('error', err => {
// error event has to be observed or the exception will bubble up
Expand Down
Loading

0 comments on commit 2ba2fc0

Please sign in to comment.