Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/taskforcesh/bullmq into c…
Browse files Browse the repository at this point in the history
…i/add-dragonfly-backend-to-tests
  • Loading branch information
manast committed Oct 23, 2023
2 parents 050853e + b026a3a commit fb5ec66
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 27 deletions.
27 changes: 24 additions & 3 deletions docs/gitbook/bullmq-pro/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
## [6.6.1](https://github.com/taskforcesh/bullmq-pro/compare/v6.6.0...v6.6.1) (2023-10-11)


### Bug Fixes

* **events:** trim events when retrying a job ([#2224](https://github.com/taskforcesh/bullmq/issues/2224)) ([1986b05](https://github.com/taskforcesh/bullmq/commit/1986b05ac03fe4ee48861aa60caadcc9df8170a6))
* **sandbox:** update progress value on job instance ([#2214](https://github.com/taskforcesh/bullmq/issues/2214)) fixes [#2213](https://github.com/taskforcesh/bullmq/issues/2213) ([3d0f36a](https://github.com/taskforcesh/bullmq/commit/3d0f36a134b7f5c6b6de26967c9d71bcfb346e72))

# [6.6.0](https://github.com/taskforcesh/bullmq-pro/compare/v6.5.0...v6.6.0) (2023-10-06)


### Bug Fixes
* **delayed:** trim events when moving jobs to delayed (python) ([#2211](https://github.com/taskforcesh/bullmq/issues/2211)) ([eca8c2d](https://github.com/taskforcesh/bullmq/commit/eca8c2d4dfeafbd8ac36a49764dbd4897303628c))

### Features

* **queue:** expose addJobLog and updateJobProgress ([#2202](https://github.com/taskforcesh/bullmq/issues/2202)) ([2056939](https://github.com/taskforcesh/bullmq/commit/205693907a4d6c2da9bd0690fb552b1d1e369c08))


# [6.5.0](https://github.com/taskforcesh/bullmq-pro/compare/v6.4.0...v6.5.0) (2023-09-28)


Expand All @@ -20,7 +39,7 @@
### Features

* **connection:** provide skipVersionCheck option for shared connections ([#2149](https://github.com/taskforcesh/bullmq/issues/2149)) ref [#2148](https://github.com/taskforcesh/bullmq/issues/2148) ([914820f](https://github.com/taskforcesh/bullmq/commit/914820f720cbc48b49f4bd1c46d148eb2bb5b79c))
* **sandbox:** sandbox: emulate moveToDelayed method ([#180](https://github.com/taskforcesh/bullmq-pro/issues/180)) ([d61de09](https://github.com/taskforcesh/bullmq-pro/commit/d61de095115481b688101bfaf0b126a02545cc6f)) ref [#2118](https://github.com/taskforcesh/bullmq/issues/2118)
* **sandbox:** emulate moveToDelayed method ([#180](https://github.com/taskforcesh/bullmq-pro/issues/180)) ([d61de09](https://github.com/taskforcesh/bullmq-pro/commit/d61de095115481b688101bfaf0b126a02545cc6f)) ref [#2118](https://github.com/taskforcesh/bullmq/issues/2118)


### Bug Fixes
Expand Down Expand Up @@ -81,9 +100,11 @@
## [6.2.2](https://github.com/taskforcesh/bullmq-pro/compare/v6.2.1...v6.2.2) (2023-07-26)


### Bug Fixes
### Features

* **deps:** upgrade bullmq to 4.6.0 ([#167](https://github.com/taskforcesh/bullmq-pro/issues/167)) ([9cf92a6](https://github.com/taskforcesh/bullmq-pro/commit/9cf92a62dc3f1e9316bd4559fde4700ff4d9b12c))
* **queue:** add promoteJobs to promote all delayed jobs ([6074592](https://github.com/taskforcesh/bullmq/commit/6074592574256ec4b1c340126288e803e56b1a64))
* **job:** add option for removing children in remove method (python) ([#2064](https://github.com/taskforcesh/bullmq/issues/2064)) ([841dc87](https://github.com/taskforcesh/bullmq/commit/841dc87a689897df81438ad1f43e45a4da77c388))
* **job:** add removeDependencyOnFailure option ([#1953](https://github.com/taskforcesh/bullmq/issues/1953)) ([ffd49e2](https://github.com/taskforcesh/bullmq/commit/ffd49e289c57252487200d47b92193228ae7451f))

## [6.2.1](https://github.com/taskforcesh/bullmq-pro/compare/v6.2.0...v6.2.1) (2023-07-25)

Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/bullmq-pro/groups/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const worker = new WorkerPro('myQueue', processFn, {
group: {
limit: {
max: 100, // Limit to 100 jobs per second per group
duration 1000,
duration: 1000,
}
},
connection
Expand Down
20 changes: 14 additions & 6 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## [4.12.5](https://github.com/taskforcesh/bullmq/compare/v4.12.4...v4.12.5) (2023-10-18)


### Performance Improvements

* **events:** trim events when removing jobs ([#2235](https://github.com/taskforcesh/bullmq/issues/2235)) (python) ([889815c](https://github.com/taskforcesh/bullmq/commit/889815c412666e5fad8f32d2e3a2d41cf650f001))

## [4.12.4](https://github.com/taskforcesh/bullmq/compare/v4.12.3...v4.12.4) (2023-10-13)


### Bug Fixes

* **events:** do not publish removed event on non-existent jobs ([#2227](https://github.com/taskforcesh/bullmq/issues/2227)) ([c134606](https://github.com/taskforcesh/bullmq/commit/c1346064c6cd9f93c59b184f150eac11d51c91b4))

## [4.12.3](https://github.com/taskforcesh/bullmq/compare/v4.12.2...v4.12.3) (2023-10-10)


Expand All @@ -22,15 +36,9 @@
# [4.12.0](https://github.com/taskforcesh/bullmq/compare/v4.11.4...v4.12.0) (2023-09-29)


### Bug Fixes

* **move-to-finished:** stringify any return value [python] ([#2198](https://github.com/taskforcesh/bullmq/issues/2198)) fixes [#2196](https://github.com/taskforcesh/bullmq/issues/2196) ([07f1335](https://github.com/taskforcesh/bullmq/commit/07f13356eb1c0136f03dfdf946d163f0ef3c4d62))


### Features

* expose addJobLog and updateJobProgress to the Queue instance ([#2202](https://github.com/taskforcesh/bullmq/issues/2202)) ([2056939](https://github.com/taskforcesh/bullmq/commit/205693907a4d6c2da9bd0690fb552b1d1e369c08))
* **queue:** add clean method [python] ([#2194](https://github.com/taskforcesh/bullmq/issues/2194)) ([3b67193](https://github.com/taskforcesh/bullmq/commit/3b6719379cbec5beb1b7dfb5f06d46cbbf74010f))

## [4.11.4](https://github.com/taskforcesh/bullmq/compare/v4.11.3...v4.11.4) (2023-09-22)

Expand Down
15 changes: 12 additions & 3 deletions docs/gitbook/python/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@

<!--next-version-placeholder-->

## v1.15.1 (2023-10-04)
## v1.15.2 (2023-10-18)
### Fix
* **delayed:** Trim events when moving jobs to delayed (python) ([#2211](https://github.com/taskforcesh/bullmq/issues/2211)) ([`eca8c2d`](https://github.com/taskforcesh/bullmq/commit/eca8c2d4dfeafbd8ac36a49764dbd4897303628c))
* **events:** Do not publish removed event on non-existent jobs ([#2227](https://github.com/taskforcesh/bullmq/issues/2227)) ([`c134606`](https://github.com/taskforcesh/bullmq/commit/c1346064c6cd9f93c59b184f150eac11d51c91b4))
* **events:** Trim events when retrying a job ([#2224](https://github.com/taskforcesh/bullmq/issues/2224)) ([`1986b05`](https://github.com/taskforcesh/bullmq/commit/1986b05ac03fe4ee48861aa60caadcc9df8170a6))
* **sandbox:** Update progress value on job instance (#2214) fixes #2213 ([`3d0f36a`](https://github.com/taskforcesh/bullmq/commit/3d0f36a134b7f5c6b6de26967c9d71bcfb346e72))

### Documentation
* **changelog:** Update changes ([`158b850`](https://github.com/taskforcesh/bullmq/commit/158b850c8b89ae08e93951420340fb8ee39a8d0a))
* **changelog:** Update multiple changelogs ([`d4683b3`](https://github.com/taskforcesh/bullmq/commit/d4683b324ce56a2209553e28a3d52ad08ee19695))

### Performance
* **events:** Trim events when removing jobs (#2235) (python) ([`889815c`](https://github.com/taskforcesh/bullmq/commit/889815c412666e5fad8f32d2e3a2d41cf650f001))

## v1.15.1 (2023-10-04)
### Fix
* **delayed:** Trim events when moving jobs to delayed (python) ([#2211](https://github.com/taskforcesh/bullmq/issues/2211)) ([`eca8c2d`](https://github.com/taskforcesh/bullmq/commit/eca8c2d4dfeafbd8ac36a49764dbd4897303628c))

## v1.15.0 (2023-09-30)
### Feature
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "4.12.3",
"version": "4.12.5",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A background job processor and message queue for Python based on Redis.
"""
__version__ = "1.15.1"
__version__ = "1.15.2"
__author__ = 'Taskforce.sh Inc.'
__credits__ = 'Taskforce.sh Inc.'

Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "bullmq"
version = "1.15.1"
version = "1.15.2"
description='BullMQ for Python'
readme="README.md"
authors = [
Expand Down
14 changes: 7 additions & 7 deletions src/commands/addJob-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ local parentData
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
--- @include "includes/trimEvents"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/updateParentDepsIfNeeded"

Expand All @@ -75,8 +74,7 @@ end

local jobCounter = rcall("INCR", KEYS[4])

-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(KEYS[3], KEYS[8])
local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents") or 10000

local parentDependenciesKey = args[7]
local timestamp = args[4]
Expand All @@ -99,7 +97,8 @@ else
end
rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
end
rcall("XADD", KEYS[8], "*", "event", "duplicated", "jobId", jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "duplicated",
"jobId", jobId)

return jobId .. "" -- convert to string
end
Expand Down Expand Up @@ -139,8 +138,8 @@ if waitChildrenKey ~= nil then
elseif (delayedTimestamp ~= 0) then
local score = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], score, jobId)
rcall("XADD", KEYS[8], "*", "event", "delayed", "jobId", jobId, "delay",
delayedTimestamp)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId,
"delay", delayedTimestamp)
-- If wait list is empty, and this delayed job is the next one to be processed,
-- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list.
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
Expand All @@ -157,7 +156,8 @@ else
addJobWithPriority(KEYS[1], KEYS[6], priority, paused, jobId, KEYS[9])
end
-- Emit waiting event
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId)
end

-- Check if this job is a child of another job, if so add it to the parents dependencies
Expand Down
8 changes: 5 additions & 3 deletions src/commands/removeJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ local function removeJob( prefix, jobId, parentKey, removeChildren)

local prev = removeJobFromAnyState(prefix, jobId)

rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed")

rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", prev);
if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then
local maxEvents = rcall("HGET", prefix .. "meta", "opts.maxLenEvents") or 10000
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
"jobId", jobId, "prev", prev)
end
end

local prefix = KEYS[1]
Expand Down
58 changes: 57 additions & 1 deletion tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,28 @@ describe('events', function () {
await worker.close();
});

describe('when jobs removal is attempted on non-existed records', async () => {
it('should not publish removed events', async () => {
const numRemovals = 100;
const trimmedQueue = new Queue(queueName, {
connection,
});

const client = await trimmedQueue.client;

for (let i = 0; i < numRemovals; i++) {
await trimmedQueue.remove(i.toString());
}

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.eql(0);

await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});

describe('when maxLen is 0', function () {
it('should trim events automatically', async () => {
const trimmedQueue = new Queue(queueName, {
Expand Down Expand Up @@ -679,7 +701,7 @@ describe('events', function () {
});
});

describe('when jobs are retried inmediately', function () {
describe('when jobs are retried immediately', function () {
it('should trim events so its length is at least the threshold', async () => {
const numJobs = 80;
const trimmedQueue = new Queue(queueName, {
Expand Down Expand Up @@ -735,6 +757,40 @@ describe('events', function () {
await removeAllQueueData(new IORedis(), queueName);
});
});

describe('when jobs removal is attempted', async () => {
it('should trim events so its length is at least the threshold', async () => {
const numRemovals = 200;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

const client = await trimmedQueue.client;

const jobs = Array.from(Array(numRemovals).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
}));
await trimmedQueue.addBulk(jobs);

for (let i = 1; i <= numRemovals; i++) {
await trimmedQueue.remove(i.toString());
}

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(100);
expect(eventsLength).to.be.gte(20);

await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});
});

it('should trim events manually', async () => {
Expand Down

0 comments on commit fb5ec66

Please sign in to comment.