From d620132ecee40fc021f7a78750dfe01331e8a8c0 Mon Sep 17 00:00:00 2001 From: Joseph Zemek Date: Tue, 25 Jun 2024 11:15:39 -0500 Subject: [PATCH] Redis transport - Redelivered messages should respect the original priority (#2026) * Redelivered messages should respect the original priority * update restore message test to check priority * flake8 * add integration tests * also add integration test for mongodb * flake8 * temporarily removing python 3.9 from CI due to unrelated failures * Update .github/workflows/ci.yaml --------- Co-authored-by: Tomer Nosrati Co-authored-by: Asif Saif Uddin --- .github/workflows/ci.yaml | 2 +- kombu/transport/redis.py | 4 +- t/integration/common.py | 79 ++++++++++++++++++++++++++++++++++ t/integration/test_mongodb.py | 70 ++++++++++++++++++++++++++++++ t/integration/test_redis.py | 72 +++++++++++++++++++++++++++++++ t/unit/transport/test_redis.py | 5 ++- 6 files changed, 228 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fc1178d47..8692461a1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8,3.9,"3.10","3.11","3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - name: Install system packages run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 9311ecf5c..515d6f7d7 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -789,8 +789,10 @@ def _do_restore_message(self, payload, exchange, routing_key, except KeyError: pass for queue in self._lookup(exchange, routing_key): + pri = self._get_message_priority(payload, reverse=False) + (pipe.lpush if leftmost else pipe.rpush)( - queue, dumps(payload), + self._q_for_pri(queue, pri), dumps(payload), ) except Exception: crit('Could not restore message: %r', payload, exc_info=True) diff --git a/t/integration/common.py b/t/integration/common.py index 82dc3f1b4..ed61b3a25 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -341,6 +341,85 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + # py-amqp transport has higher numbers higher priority + # redis transport has lower numbers higher priority + if self.PRIORITY_ORDER == 'asc': + prio_max = 9 + prio_high = 6 + prio_low = 3 + else: + prio_max = 0 + prio_high = 3 + prio_low = 6 + + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, prio_low], + [{'msg': 'second'}, prio_high], + [{'msg': 'third'}, prio_low], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + # drain_events() returns just on number in + # Virtual transports + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth max priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio_max + ) + + with consumer: + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} + def test_simple_queue_publish_consume(self, connection): if self.PRIORITY_ORDER == 'asc': prio_high = 7 diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py index 445f13898..f407ee133 100644 --- a/t/integration/test_mongodb.py +++ b/t/integration/test_mongodb.py @@ -108,6 +108,76 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6], + [{'msg': 'third'}, 3], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth higher priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=9 # highest priority + ) + + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} + @pytest.mark.env('mongodb') @pytest.mark.flaky(reruns=5, reruns_delay=2) diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index bc20b0dd4..636848a93 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -130,6 +130,78 @@ def callback(body, message): assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + def test_publish_requeue_consume(self, connection): + test_queue = kombu.Queue( + 'priority_requeue_test', + routing_key='priority_requeue_test', max_priority=10 + ) + + received_messages = [] + received_message_bodies = [] + + def callback(body, message): + received_messages.append(message) + received_message_bodies.append(body) + # don't ack the message so it can be requeued + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 6], + [{'msg': 'second'}, 3], + [{'msg': 'third'}, 6], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + # Sleep to make sure that queue sorted based on priority + sleep(0.5) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + # drain_events() consumes only one value unlike in py-amqp. + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # requeue the messages + for msg in received_messages: + msg.requeue() + received_messages.clear() + received_message_bodies.clear() + + # add a fourth higher priority message + producer.publish( + {'msg': 'fourth'}, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=0 # highest priority + ) + + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + + # Fourth message must be received first + assert received_message_bodies[0] == {'msg': 'fourth'} + assert received_message_bodies[1] == {'msg': 'second'} + assert received_message_bodies[2] == {'msg': 'first'} + assert received_message_bodies[3] == {'msg': 'third'} + @pytest.mark.env('redis') @pytest.mark.flaky(reruns=5, reruns_delay=2) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index a2c015ec2..778d18047 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -609,7 +609,7 @@ def test_do_restore_message_celery(self): "exchange": "", "routing_key": "celery", }, - "priority": 0, + "priority": 3, "body_encoding": "base64", "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f", }, @@ -627,7 +627,8 @@ def test_do_restore_message_celery(self): payload, 'exchange', 'routing_key', client, ) - client.rpush.assert_called_with(queue, dumps(result_payload)) + client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3), + dumps(result_payload)) def test_restore_no_messages(self): message = Mock(name='message')