Skip to content

Commit

Permalink
Redis transport - Redelivered messages should respect the original pr…
Browse files Browse the repository at this point in the history
…iority (#2026)

* Redelivered messages should respect the original priority

* update restore message test to check priority

* flake8

* add integration tests

* also add integration test for mongodb

* flake8

* temporarily removing python 3.9 from CI due to unrelated failures

* Update .github/workflows/ci.yaml

---------

Co-authored-by: Tomer Nosrati <[email protected]>
Co-authored-by: Asif Saif Uddin <[email protected]>
  • Loading branch information
3 people authored Jun 25, 2024
1 parent d05b4b0 commit d620132
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8,3.9,"3.10","3.11","3.12"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- name: Install system packages
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
Expand Down
4 changes: 3 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,10 @@ def _do_restore_message(self, payload, exchange, routing_key,
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
pri = self._get_message_priority(payload, reverse=False)

(pipe.lpush if leftmost else pipe.rpush)(
queue, dumps(payload),
self._q_for_pri(queue, pri), dumps(payload),
)
except Exception:
crit('Could not restore message: %r', payload, exc_info=True)
Expand Down
79 changes: 79 additions & 0 deletions t/integration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,85 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
# py-amqp transport has higher numbers higher priority
# redis transport has lower numbers higher priority
if self.PRIORITY_ORDER == 'asc':
prio_max = 9
prio_high = 6
prio_low = 3
else:
prio_max = 0
prio_high = 3
prio_low = 6

test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, prio_low],
[{'msg': 'second'}, prio_high],
[{'msg': 'third'}, prio_low],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
# drain_events() returns just on number in
# Virtual transports
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth max priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio_max
)

with consumer:
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}

def test_simple_queue_publish_consume(self, connection):
if self.PRIORITY_ORDER == 'asc':
prio_high = 7
Expand Down
70 changes: 70 additions & 0 deletions t/integration/test_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,76 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, 3],
[{'msg': 'second'}, 6],
[{'msg': 'third'}, 3],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)

consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth higher priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=9 # highest priority
)

with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
Expand Down
72 changes: 72 additions & 0 deletions t/integration/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,78 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, 6],
[{'msg': 'second'}, 3],
[{'msg': 'third'}, 6],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
# drain_events() consumes only one value unlike in py-amqp.
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth higher priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=0 # highest priority
)

with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}


@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
Expand Down
5 changes: 3 additions & 2 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def test_do_restore_message_celery(self):
"exchange": "",
"routing_key": "celery",
},
"priority": 0,
"priority": 3,
"body_encoding": "base64",
"delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
},
Expand All @@ -627,7 +627,8 @@ def test_do_restore_message_celery(self):
payload, 'exchange', 'routing_key', client,
)

client.rpush.assert_called_with(queue, dumps(result_payload))
client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3),
dumps(result_payload))

def test_restore_no_messages(self):
message = Mock(name='message')
Expand Down

0 comments on commit d620132

Please sign in to comment.