Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Fix combination of auto_lock_renewer + session + receive_and_delete mode. #15343

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
**Breaking Changes**
* `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler.

**Bug Fixes**

* Using parameter `auto_lock_renewer` on a sessionful receiver alongside `ReceiveMode.ReceiveAndDelete` will no longer fail during receipt due to failure to register the message with the renewer.

## 7.0.0b8 (2020-11-05)

**New Features**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _iter_next(self):
self._message_iter = self._handler.receive_messages_iter()
uamqp_message = next(self._message_iter)
message = self._build_message(uamqp_message)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
self._auto_lock_renewer.register(self, message)
return message

Expand Down Expand Up @@ -521,7 +521,7 @@ def receive_messages(self, max_message_count=1, max_wait_time=None):
timeout=max_wait_time,
operation_requires_timeout=True
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down Expand Up @@ -576,7 +576,7 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs):
handler,
timeout=timeout
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async def _iter_next(self):
self._message_iter = self._handler.receive_messages_iter_async()
uamqp_message = await self._message_iter.__anext__()
message = self._build_message(uamqp_message)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
self._auto_lock_renewer.register(self, message)
return message

Expand Down Expand Up @@ -520,7 +520,7 @@ async def receive_messages(
timeout=max_wait_time,
operation_requires_timeout=True
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down Expand Up @@ -581,7 +581,7 @@ async def receive_deferred_messages(
handler,
timeout=timeout
)
if self._auto_lock_renewer and not self._session:
if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete:
for message in messages:
self._auto_lock_renewer.register(self, message)
return messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,28 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac
raise
await renewer.close()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
async def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
session_id = str(uuid.uuid4())
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
await sender.send_messages(ServiceBusMessage("test_message", session_id=session_id))

failures = 0
async def should_not_run(*args, **kwargs):
failures += 1

async with sb_client.get_queue_receiver(servicebus_queue.name,
session_id=session_id,
receive_mode=ReceiveMode.ReceiveAndDelete,
auto_lock_renewer=AutoLockRenewer()) as receiver:
assert receiver.receive_messages()
assert not failures

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
26 changes: 26 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,32 @@ def lock_lost_callback(renewable, error):
renewer.close()
assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
session_id = str(uuid.uuid4())
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(ServiceBusMessage("test_message", session_id=session_id))

failures = 0
def should_not_run(*args, **kwargs):
failures += 1

auto_lock_renewer = AutoLockRenewer(on_lock_renew_failure=should_not_run)
with sb_client.get_queue_receiver(servicebus_queue.name,
session_id=session_id,
receive_mode=ReceiveMode.ReceiveAndDelete,
auto_lock_renewer=auto_lock_renewer) as receiver:

assert receiver.receive_messages()
assert not failures

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer()
Expand Down