Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis transport - Redelivered messages should respect the original priority #2026

Merged
merged 10 commits into from
Jun 25, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8,3.9,"3.10","3.11","3.12"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- name: Install system packages
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
Expand Down
4 changes: 3 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,10 @@ def _do_restore_message(self, payload, exchange, routing_key,
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
pri = self._get_message_priority(payload, reverse=False)

(pipe.lpush if leftmost else pipe.rpush)(
queue, dumps(payload),
self._q_for_pri(queue, pri), dumps(payload),
)
except Exception:
crit('Could not restore message: %r', payload, exc_info=True)
Expand Down
79 changes: 79 additions & 0 deletions t/integration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,85 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
# py-amqp transport has higher numbers higher priority
# redis transport has lower numbers higher priority
if self.PRIORITY_ORDER == 'asc':
prio_max = 9
prio_high = 6
prio_low = 3
else:
prio_max = 0
prio_high = 3
prio_low = 6

test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, prio_low],
[{'msg': 'second'}, prio_high],
[{'msg': 'third'}, prio_low],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
# drain_events() returns just on number in
# Virtual transports
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth max priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio_max
)

with consumer:
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}

def test_simple_queue_publish_consume(self, connection):
if self.PRIORITY_ORDER == 'asc':
prio_high = 7
Expand Down
70 changes: 70 additions & 0 deletions t/integration/test_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,76 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, 3],
[{'msg': 'second'}, 6],
[{'msg': 'third'}, 3],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)

consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth higher priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=9 # highest priority
)

with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}


@pytest.mark.env('mongodb')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
Expand Down
72 changes: 72 additions & 0 deletions t/integration/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,78 @@ def callback(body, message):
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}

def test_publish_requeue_consume(self, connection):
test_queue = kombu.Queue(
'priority_requeue_test',
routing_key='priority_requeue_test', max_priority=10
)

received_messages = []
received_message_bodies = []

def callback(body, message):
received_messages.append(message)
received_message_bodies.append(body)
# don't ack the message so it can be requeued

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for msg, prio in [
[{'msg': 'first'}, 6],
[{'msg': 'second'}, 3],
[{'msg': 'third'}, 6],
]:
producer.publish(
msg,
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=prio
)
# Sleep to make sure that queue sorted based on priority
sleep(0.5)
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
# drain_events() consumes only one value unlike in py-amqp.
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# requeue the messages
for msg in received_messages:
msg.requeue()
received_messages.clear()
received_message_bodies.clear()

# add a fourth higher priority message
producer.publish(
{'msg': 'fourth'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle',
priority=0 # highest priority
)

with consumer:
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)
conn.drain_events(timeout=1)

# Fourth message must be received first
assert received_message_bodies[0] == {'msg': 'fourth'}
assert received_message_bodies[1] == {'msg': 'second'}
assert received_message_bodies[2] == {'msg': 'first'}
assert received_message_bodies[3] == {'msg': 'third'}


@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
Expand Down
5 changes: 3 additions & 2 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def test_do_restore_message_celery(self):
"exchange": "",
"routing_key": "celery",
},
"priority": 0,
"priority": 3,
"body_encoding": "base64",
"delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
},
Expand All @@ -627,7 +627,8 @@ def test_do_restore_message_celery(self):
payload, 'exchange', 'routing_key', client,
)

client.rpush.assert_called_with(queue, dumps(result_payload))
client.rpush.assert_called_with(self.channel._q_for_pri(queue, 3),
dumps(result_payload))

def test_restore_no_messages(self):
message = Mock(name='message')
Expand Down