Skip to content

Commit

Permalink
[AMQP] Fix Filter Set Encoding For 2 Char Length Session id (#32860)
Browse files Browse the repository at this point in the history
* fix for len 2 string

* fix for char length

* fix pylint

* fix to keep the right data value

* pylint

* switch order

* raise error

* encode unit tests

* get behavior in line with uamqp

* modified to add any value

* narrow exception

* live test

* changelog
  • Loading branch information
kashifkhan authored Nov 13, 2023
1 parent 1568f5f commit 09f0ab0
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 22 deletions.
21 changes: 13 additions & 8 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,19 @@ def encode_filter_set(value):
else:
if isinstance(name, str):
name = name.encode("utf-8") # type: ignore
try:
descriptor, filter_value = data
described_filter = {
TYPE: AMQPTypes.described,
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
}
except ValueError:
described_filter = data
if isinstance(data, (str, bytes)):
described_filter = data # type: ignore
# handle the situation when data is a tuple or list of length 2
else:
try:
descriptor, filter_value = data
described_filter = {
TYPE: AMQPTypes.described,
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
}
# if its not a type that is known, raise the error from the server
except (ValueError, TypeError):
described_filter = data

cast(List, fields[VALUE]).append(
({TYPE: AMQPTypes.symbol, VALUE: name}, described_filter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest
from azure.eventhub._pyamqp._encode import encode_filter_set

@pytest.mark.parametrize("value,expected", [
({b'com.microsoft:session-filter': 'ababa'}, 'ababa'),
({b'com.microsoft:session-filter': 'abab'}, 'abab'),
({b'com.microsoft:session-filter': 'aba'}, 'aba'),
({b'com.microsoft:session-filter': 'ab'}, 'ab'),
({b'com.microsoft:session-filter': 'a'}, 'a'),
({b'com.microsoft:session-filter': 1}, 1),
])
def test_valid_filter_encode(value, expected):
fields = encode_filter_set(value)
assert len(fields) == 2
assert fields['VALUE'][0][1] == expected


8 changes: 2 additions & 6 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 7.11.4 (Unreleased)

### Features Added

### Breaking Changes
## 7.11.4 (2023-11-13)

### Bugs Fixed

### Other Changes
- Fixed a bug where a two character count session id was being incorrectly parsed by azure amqp.

## 7.11.3 (2023-10-11)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,19 @@ def encode_filter_set(value):
else:
if isinstance(name, str):
name = name.encode("utf-8") # type: ignore
try:
descriptor, filter_value = data
described_filter = {
TYPE: AMQPTypes.described,
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
}
except ValueError:
described_filter = data
if isinstance(data, (str, bytes)):
described_filter = data # type: ignore
# handle the situation when data is a tuple or list of length 2
else:
try:
descriptor, filter_value = data
described_filter = {
TYPE: AMQPTypes.described,
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
}
# if its not a type that is known, raise the error from the server
except (ValueError, TypeError):
described_filter = data

cast(List, fields[VALUE]).append(
({TYPE: AMQPTypes.symbol, VALUE: name}, described_filter)
Expand Down
27 changes: 27 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,3 +1300,30 @@ def test_session_non_session_send_to_session_queue_should_fail(self, uamqp_trans
message = ServiceBusMessage("This should be an invalid non session message")
with pytest.raises(ServiceBusError):
sender.send_messages(message)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_session_id_str_bytes(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):

with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:

sessions = []
start_time = utc_now()
for i in range(5):
sessions.append('a' * (i + 1))

for session_id in sessions:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("Test message no. {}".format(i), session_id=session_id)
sender.send_messages(message)
for session_id in sessions:
with sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
assert len(messages) == 1
assert messages[0].session_id == session_id

0 comments on commit 09f0ab0

Please sign in to comment.