Skip to content

Commit

Permalink
Merge branch 'master' into ci/add-dragonfly-backend-to-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Nov 3, 2023
2 parents ea0d85e + edd4b6b commit c2b7970
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 17 deletions.
21 changes: 21 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
## [4.12.8](https://github.com/taskforcesh/bullmq/compare/v4.12.7...v4.12.8) (2023-11-03)


### Bug Fixes

* **worker:** keep extending locks while closing workers ([#2259](https://github.com/taskforcesh/bullmq/issues/2259)) ([c4d12ea](https://github.com/taskforcesh/bullmq/commit/c4d12ea3a9837ffd7f58e2134796137c4181c3de))

## [4.12.7](https://github.com/taskforcesh/bullmq/compare/v4.12.6...v4.12.7) (2023-10-29)


### Performance Improvements

* **redis-connection:** check redis version greater or equal than v6 only once ([#2252](https://github.com/taskforcesh/bullmq/issues/2252)) ([a09b15a](https://github.com/taskforcesh/bullmq/commit/a09b15af0d5dedfa83bce7130ee9094f3fb69e10))

## [4.12.6](https://github.com/taskforcesh/bullmq/compare/v4.12.5...v4.12.6) (2023-10-26)


### Bug Fixes

* **sandbox:** do not return empty object result when it is undefined ([#2247](https://github.com/taskforcesh/bullmq/issues/2247)) ([308db7f](https://github.com/taskforcesh/bullmq/commit/308db7f58758a72b8abb272da8e92509813a2178))

## [4.12.5](https://github.com/taskforcesh/bullmq/compare/v4.12.4...v4.12.5) (2023-10-18)


Expand Down
14 changes: 14 additions & 0 deletions docs/gitbook/guide/jobs/repeatable.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ There are some important considerations regarding repeatable jobs:
- If there are no workers running, repeatable jobs will not accumulate next time a worker is online.
- repeatable jobs can be removed using the [removeRepeatable](https://api.docs.bullmq.io/classes/v4.Queue.html#removeRepeatable) method or [removeRepeatableByKey](https://api.docs.bullmq.io/classes/v4.Queue.html#removeRepeatableByKey).

```typescript
import { Queue } from 'bullmq';

const repeat = { pattern: '*/1 * * * * *' };

const myQueue = new Queue('Paint');

const job1 = await myQueue.add('red', { foo: 'bar' }, { repeat });
const job2 = await myQueue.add('blue', { foo: 'baz' }, { repeat });

const isRemoved1 = await myQueue.removeRepeatableByKey(job1.repeatJobKey);
const isRemoved2 = await queue.removeRepeatable('blue', repeat);
```

All repeatable jobs have a repeatable job key that holds some metadata of the repeatable job itself. It is possible to retrieve all the current repeatable jobs in the queue calling [getRepeatableJobs](https://api.docs.bullmq.io/classes/v4.Queue.html#getRepeatableJobs):

```typescript
Expand Down
16 changes: 15 additions & 1 deletion docs/gitbook/guide/retrying-failing-jobs.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Retrying failing jobs

As your queues processes jobs, it is inevitable that over time some of these jobs will fail. In BullMQ, a job is considered failed in the following scenarios:
As your queues process jobs, it is inevitable that over time some of these jobs will fail. In BullMQ, a job is considered failed in the following scenarios:

- The processor function defined in your [Worker](https://docs.bullmq.io/guide/workers) has thrown an exception.
- The job has become [stalled](https://docs.bullmq.io/guide/jobs/stalled) and it has consumed the "max stalled count" setting.
Expand All @@ -19,6 +19,10 @@ Often it is desirable to automatically retry failed jobs so that we do not give

BullMQ supports retries of failed jobs using back-off functions. It is possible to use the **built-in** backoff functions or provide **custom** ones. If you do not specify a back-off function, the jobs will be retried without delay as soon as they fail.

{% hint style="info" %}
Retried jobs will respect their priority when they are moved back to waiting state.
{% endhint %}

#### Built-in backoff strategies

The current built-in backoff functions are "exponential" and "fixed".
Expand Down Expand Up @@ -81,6 +85,12 @@ const worker = new Worker('foo', async job => doSomeProcessing(), {
});
```

{% hint style="info" %}
If your backoffStrategy returns 0, jobs will be moved at the end of our waiting list (priority 0) or moved back to prioritized state (priority > 0).

If your backoffStrategy returns -1, jobs won't be retried, instead they will be moved to failed state.
{% endhint %}

You can then use your custom strategy when adding jobs:

```typescript
Expand Down Expand Up @@ -128,3 +138,7 @@ const worker = new Worker('foo', async job => doSomeProcessing(), {
},
});
```

## Read more:

- 💡 [Stop Retrying Jobs](../patterns/stop-retrying-jobs.md)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "4.12.5",
"version": "4.12.8",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
4 changes: 2 additions & 2 deletions src/classes/child-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ export class ChildProcessor {
this.currentJobPromise = (async () => {
try {
const job = this.wrapJob(jobJson, this.send);
const result = (await this.processor(job, token)) || {};
const result = await this.processor(job, token);
await this.send({
cmd: ParentCommand.Completed,
value: result,
value: typeof result === 'undefined' ? null : result,
});
} catch (err) {
await this.send({
Expand Down
6 changes: 4 additions & 2 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
keys: KeysMap;
closing: Promise<void> | undefined;

protected closed: boolean = false;
protected scripts: Scripts;
protected connection: RedisConnection;
public readonly qualifiedName: string;
Expand Down Expand Up @@ -137,11 +138,12 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
*
* @returns Closes the connection and returns a promise that resolves when the connection is closed.
*/
close(): Promise<void> {
async close(): Promise<void> {
if (!this.closing) {
this.closing = this.connection.close();
}
return this.closing;
await this.closing;
this.closed = true;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ export class Queue<
*
* @see removeRepeatableByKey
*
* @param name -
* @param name - job name
* @param repeatOpts -
* @param jobId -
* @returns
Expand All @@ -337,7 +337,7 @@ export class Queue<
*
* @see getRepeatableJobs
*
* @param key - to the repeatable job.
* @param repeatJobKey - to the repeatable job.
* @returns
*/
async removeRepeatableByKey(key: string): Promise<boolean> {
Expand Down
14 changes: 13 additions & 1 deletion src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const deprecationMessage = [
'On the next versions having this settings will throw an exception',
].join(' ');

interface RedisCapabilities {
canDoubleTimeout: boolean;
}

export interface RawCommand {
content: string;
name: string;
Expand All @@ -33,11 +37,14 @@ export class RedisConnection extends EventEmitter {
static recommendedMinimumVersion = '6.2.0';

closing: boolean;
capabilities: RedisCapabilities = {
canDoubleTimeout: false,
};

protected _client: RedisClient;

private readonly opts: RedisOptions;
private initializing: Promise<RedisClient>;
private readonly initializing: Promise<RedisClient>;

private version: string;
private skipVersionCheck: boolean;
Expand Down Expand Up @@ -207,6 +214,11 @@ export class RedisConnection extends EventEmitter {
);
}
}

this.capabilities = {
canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'),
};

return this._client;
}

Expand Down
12 changes: 5 additions & 7 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,9 @@ export class Worker<
);

// Only Redis v6.0.0 and above supports doubles as block time
blockTimeout = isRedisVersionLowerThan(
this.blockingConnection.redisVersion,
'6.0.0',
)
? Math.ceil(blockTimeout)
: blockTimeout;
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
Expand Down Expand Up @@ -782,6 +779,7 @@ export class Worker<
.finally(() => client.disconnect())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
this.closed = true;
})();
return this.closing;
}
Expand Down Expand Up @@ -821,7 +819,7 @@ export class Worker<
if (!this.opts.skipLockRenewal) {
clearTimeout(this.extendLocksTimer);

if (!this.closing) {
if (!this.closed) {
this.extendLocksTimer = setTimeout(async () => {
// Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime
const now = Date.now();
Expand Down
38 changes: 38 additions & 0 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,44 @@ describe('repeat', function () {
});
});

describe('when repeatable job is promoted', function () {
it('keeps one repeatable and one delayed after being processed', async function () {
const options = {
repeat: {
pattern: '0 * 1 * *',
},
};

const worker = new Worker(queueName, async () => {}, { connection });

const completing = new Promise<void>(resolve => {
worker.on('completed', () => {
resolve();
});
});

const repeatableJob = await queue.add('test', { foo: 'bar' }, options);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob.promote();
await completing;

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const configs = await repeat.getRepeatableJobs(0, -1, true);

expect(delayedCount).to.be.equal(1);

const count = await queue.count();

expect(count).to.be.equal(1);
expect(configs).to.have.length(1);
await worker.close();
});
});

it('should allow removing a named repeatable job', async function () {
const numJobs = 3;
const date = new Date('2017-02-07 9:24:00');
Expand Down
4 changes: 3 additions & 1 deletion tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ function sandboxProcessTests(
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.returnvalue).to.be.eql(42);
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(42);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
Expand Down Expand Up @@ -794,8 +795,9 @@ function sandboxProcessTests(
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async () => {
worker.on('completed', async job => {
try {
expect(job.returnvalue).to.be.undefined;
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
Expand Down
33 changes: 33 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,39 @@ describe('workers', function () {
).to.throw('stalledInterval must be greater than 0');
});

it('lock extender continues to run until all active jobs are completed when closing a worker', async function () {
this.timeout(4000);
let worker;

const startProcessing = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
return delay(2000);
},
{
connection,
lockDuration: 1000,
lockRenewTime: 500,
stalledInterval: 1000,
},
);
});

await queue.add('test', { bar: 'baz' });

const completed = new Promise((resolve, reject) => {
worker.on('completed', resolve);
worker.on('failed', reject);
});

await startProcessing;
await worker.close();

await completed;
});

describe('Concurrency process', () => {
it('should thrown an exception if I specify a concurrency of 0', () => {
try {
Expand Down

0 comments on commit c2b7970

Please sign in to comment.