Skip to content

Commit

Permalink
Fix combined session+auto-auto_lock_renewer+receive-and-delete mode i…
Browse files Browse the repository at this point in the history
…ssue where registry would fail during receipt. Add tests and changelog entry. (#15343)
  • Loading branch information
KieranBrantnerMagee authored Nov 18, 2020
1 parent 7bc5cdf commit f111ffc
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 6 deletions.
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
**Breaking Changes**
* `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler.

**Bug Fixes**

* Using parameter `auto_lock_renewer` on a sessionful receiver alongside `ReceiveMode.ReceiveAndDelete` will no longer fail during receipt due to failure to register the message with the renewer.

## 7.0.0b8 (2020-11-05)

**New Features**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _iter_next(self):
self._message_iter = self._handler.receive_messages_iter()
uamqp_message = next(self._message_iter)
message = self._build_message(uamqp_message)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
self._auto_lock_renewer.register(self, message)
return message

Expand Down Expand Up @@ -521,7 +521,7 @@ def receive_messages(self, max_message_count=1, max_wait_time=None):
timeout=max_wait_time,
operation_requires_timeout=True
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down Expand Up @@ -576,7 +576,7 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs):
handler,
timeout=timeout
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async def _iter_next(self):
self._message_iter = self._handler.receive_messages_iter_async()
uamqp_message = await self._message_iter.__anext__()
message = self._build_message(uamqp_message)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
self._auto_lock_renewer.register(self, message)
return message

Expand Down Expand Up @@ -520,7 +520,7 @@ async def receive_messages(
timeout=max_wait_time,
operation_requires_timeout=True
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down Expand Up @@ -581,7 +581,7 @@ async def receive_deferred_messages(
handler,
timeout=timeout
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,28 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac
raise
await renewer.close()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
async def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
session_id = str(uuid.uuid4())
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
await sender.send_messages(ServiceBusMessage("test_message", session_id=session_id))

failures = 0
async def should_not_run(*args, **kwargs):
failures += 1

async with sb_client.get_queue_receiver(servicebus_queue.name,
session_id=session_id,
receive_mode=ReceiveMode.ReceiveAndDelete,
auto_lock_renewer=AutoLockRenewer()) as receiver:
assert receiver.receive_messages()
assert not failures

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
26 changes: 26 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,32 @@ def lock_lost_callback(renewable, error):
renewer.close()
assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
session_id = str(uuid.uuid4())
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(ServiceBusMessage("test_message", session_id=session_id))

failures = 0
def should_not_run(*args, **kwargs):
failures += 1

auto_lock_renewer = AutoLockRenewer(on_lock_renew_failure=should_not_run)
with sb_client.get_queue_receiver(servicebus_queue.name,
session_id=session_id,
receive_mode=ReceiveMode.ReceiveAndDelete,
auto_lock_renewer=auto_lock_renewer) as receiver:

assert receiver.receive_messages()
assert not failures

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer()
Expand Down

0 comments on commit f111ffc

Please sign in to comment.