From 8163a000b04e83acab8c42c37ac6eeee08a0b786 Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Thu, 13 Jun 2024 17:14:12 -0500 Subject: [PATCH 1/8] Redelivered messages should respect the original priority --- kombu/transport/redis.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 9311ecf5c..515d6f7d7 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -789,8 +789,10 @@ def _do_restore_message(self, payload, exchange, routing_key, except KeyError: pass for queue in self._lookup(exchange, routing_key): + pri = self._get_message_priority(payload, reverse=False) + (pipe.lpush if leftmost else pipe.rpush)( - queue, dumps(payload), + self._q_for_pri(queue, pri), dumps(payload), ) except Exception: crit('Could not restore message: %r', payload, exc_info=True) From ef64cec9e6630b82798188a199e5b875f4942aa0 Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 17 Jun 2024 01:38:53 -0500 Subject: [PATCH 2/8] update restore message test to check priority --- t/unit/transport/test_redis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index a2c015ec2..627df4e41 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -609,7 +609,7 @@ def test_do_restore_message_celery(self): "exchange": "", "routing_key": "celery", }, - "priority": 0, + "priority": 3, "body_encoding": "base64", "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f", }, @@ -627,7 +627,7 @@ def test_do_restore_message_celery(self): payload, 'exchange', 'routing_key', client, ) - client.rpush.assert_called_with(queue, dumps(result_payload)) + client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3), dumps(result_payload)) def test_restore_no_messages(self): message = Mock(name='message') From 7f95ead87769a1e07f3ec353908eb8f448675e3c Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 17 Jun 2024 01:41:37 -0500 Subject: [PATCH 3/8] flake8 --- t/unit/transport/test_redis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 627df4e41..778d18047 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -627,7 +627,8 @@ def test_do_restore_message_celery(self): payload, 'exchange', 'routing_key', client, ) - client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3), dumps(result_payload)) + client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3), + dumps(result_payload)) def test_restore_no_messages(self): message = Mock(name='message') From 2332dd26e034e80c997e0600e1fdc9c22beae727 Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 24 Jun 2024 13:14:36 -0700 Subject: [PATCH 4/8] add integration tests --- t/integration/common.py | 79 +++++++++++++++++++++++++++++++++++++ t/integration/test_redis.py | 73 ++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/t/integration/common.py b/t/integration/common.py index 82dc3f1b4..ed61b3a25 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -341,6 +341,85 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + # py-amqp transport has higher numbers higher priority + # redis transport has lower numbers higher priority + if self.PRIORITY_ORDER == 'asc': + prio_max = 9 + prio_high = 6 + prio_low = 3 + else: + prio_max = 0 + prio_high = 3 + prio_low = 6 + + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, prio_low], + [{'msg': 'second'}, prio_high], + [{'msg': 'third'}, prio_low], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + # drain_events() returns just on number in + # Virtual transports + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth max priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio_max + ) + + with consumer: + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} + def test_simple_queue_publish_consume(self, connection): if self.PRIORITY_ORDER == 'asc': prio_high = 7 diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index bc20b0dd4..e54d75914 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -130,6 +130,79 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 6], + [{'msg': 'second'}, 3], + [{'msg': 'third'}, 6], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + # drain_events() returns just on number in + # Virtual transports + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth higher priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=0 # highest priority + ) + + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} + @pytest.mark.env('redis') @pytest.mark.flaky(reruns=5, reruns_delay=2) From 541441684d9edd4c6982d15a83b2bd525cedc10b Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 24 Jun 2024 13:41:11 -0700 Subject: [PATCH 5/8] also add integration test for mongodb --- t/integration/test_mongodb.py | 69 +++++++++++++++++++++++++++++++++++ t/integration/test_redis.py | 3 +- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py index 445f13898..fb418909e 100644 --- a/t/integration/test_mongodb.py +++ b/t/integration/test_mongodb.py @@ -108,6 +108,75 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6], + [{'msg': 'third'}, 3], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth higher priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=9 # highest priority + ) + + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} @pytest.mark.env('mongodb') @pytest.mark.flaky(reruns=5, reruns_delay=2) diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index e54d75914..636848a93 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -168,8 +168,7 @@ def callback(body, message): ) consumer.register_callback(callback) with consumer: - # drain_events() returns just on number in - # Virtual transports + # drain_events() consumes only one value unlike in py-amqp. conn.drain_events(timeout=1) conn.drain_events(timeout=1) conn.drain_events(timeout=1) From 9c01b67ec2ce9e7b678c768a1b33eb0de646c89e Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 24 Jun 2024 13:48:31 -0700 Subject: [PATCH 6/8] flake8 --- t/integration/test_mongodb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py index fb418909e..f407ee133 100644 --- a/t/integration/test_mongodb.py +++ b/t/integration/test_mongodb.py @@ -178,6 +178,7 @@ def callback(body, message): assert received_message_bodies[2] == {'msg': 'first'} assert received_message_bodies[3] == {'msg': 'third'} + @pytest.mark.env('mongodb') @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_MongoDBMessage(BaseMessage): From 32f8fe1bce2c7948191b951f1419f73da1adc5ce Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Mon, 24 Jun 2024 15:26:58 -0700 Subject: [PATCH 7/8] temporarily removing python 3.9 from CI due to unrelated failures --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fc1178d47..a99cd25c9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8,3.9,"3.10","3.11","3.12"] + python-version: [3.8,"3.10","3.11","3.12"] steps: - name: Install system packages run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev From 7cb8e1f5c416698e9a44753986d49e43064228e5 Mon Sep 17 00:00:00 2001 From: Asif Saif Uddin Date: Tue, 25 Jun 2024 12:43:28 +0600 Subject: [PATCH 8/8] Update .github/workflows/ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a99cd25c9..8692461a1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8,"3.10","3.11","3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - name: Install system packages run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev