diff --git a/tests/fixtures/fixture_processor_steps.js b/tests/fixtures/fixture_processor_steps.js new file mode 100644 index 0000000000..e1ac1edfbd --- /dev/null +++ b/tests/fixtures/fixture_processor_steps.js @@ -0,0 +1,34 @@ +'use strict'; + +const delay = require('./delay'); + +module.exports = async function (job) { + let step = job.data.step; + while (step !== 'FINISH') { + switch (step) { + case 'INITIAL': { + await delay(200); + await job.updateData({ + ...job.data, + step: 'SECOND', + extraDataSecondStep: 'second data', + }); + step = 'SECOND'; + break; + } + case 'SECOND': { + await delay(200); + await job.updateData({ + ...job.data, + extraDataFinishedStep: 'finish data', + step: 'FINISH', + }); + step = 'FINISH'; + return; + } + default: { + throw new Error('invalid step'); + } + } + } +}; diff --git a/tests/test_sandbox_step_pattern.ts b/tests/test_sandbox_step_pattern.ts new file mode 100644 index 0000000000..bb6075c0ec --- /dev/null +++ b/tests/test_sandbox_step_pattern.ts @@ -0,0 +1,63 @@ +import { expect } from 'chai'; +import { default as IORedis } from 'ioredis'; +import { Job, Queue, Worker } from '../src/classes'; +import { beforeEach, before, after as afterAll, it } from 'mocha'; +import { v4 } from 'uuid'; +import { removeAllQueueData } from '../src/utils'; + +describe.only('sandbox step pattern', () => { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + let queue: Queue; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + queueName = `test-${v4()}`; + queue = new Queue(queueName, { connection, prefix }); + }); + + afterEach(async function () { + await queue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + it('should process steps and complete', async () => { + const processFile = __dirname + '/fixtures/fixture_processor_steps.js'; + + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + }); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + try { + expect(job.data).to.be.eql({ + step: 'FINISH', + extraDataSecondStep: 'second data', + extraDataFinishedStep: 'finish data', + }); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { step: 'INITIAL' }); + + await completing; + + await worker.close(); + }); +});