Skip to content

Commit

Permalink
[ServiceBus] Pre-GA Bug Bash fixes (#15489)
Browse files Browse the repository at this point in the history
- Small fix found during bug bash, an already disposed handler during iter finalizer causes issues on iter termination.
- Given the risks the above exposed, made the explicit getter which necessitated the iter wrapper voodoo internal so as to give time to ensure we're not locking ourselves into a dangerous pattern.  This was simply a helper and the same functionality can still be accessed by iterating on the base receiver.
- Adjusts docs, samples, tests.
- clarify one changelog point (Thanks for noting Yunhaoling)
- Fixes up some stress tests that had been missed in prior refactors and ensure all green.
- Tick version to 7.0.0 so changelog validation doesn't get angry.
- Clarify changelog, add note to stress test purpose.
Co-authored-by: Adam Ling (MSFT) <[email protected]>
  • Loading branch information
KieranBrantnerMagee authored Nov 21, 2020
1 parent 8a164d3 commit 1810441
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 38 deletions.
3 changes: 2 additions & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* An improper `receive_mode` value will now raise `ValueError` instead of `TypeError` in line with supporting extensible enums.
* Rename enum values `DeadLetter` to `DEAD_LETTER`, `TransferDeadLetter` to `TRANSFER_DEAD_LETTER`, `PeekLock` to `PEEK_LOCK` and `ReceiveAndDelete` to `RECEIVE_AND_DELETE` to conform to sdk guidelines going forward.
* `send_messages`, `schedule_messages`, `cancel_scheduled_messages` and `receive_deferred_messages` now performs a no-op rather than raising a `ValueError` if provided an empty list of messages or an empty batch.
* `amqp_annotated_message` has been renamed to `raw_amqp_message` to normalize with other SDKs.
* `ServiceBusMessage.amqp_annotated_message` has been renamed to `ServiceBusMessage.raw_amqp_message` to normalize with other SDKs.
* Redesigned error hierarchy based on the service-defined error condition:
- `MessageAlreadySettled` now inherits from `ValueError` instead of `ServiceBusMessageError` as it's a client-side validation.
- Removed `NoActiveSession` which is now replaced by `OperationTimeoutError` as the client times out when trying to connect to any available session.
Expand All @@ -41,6 +41,7 @@ raise `ValueError` if the `queue_name` or `topic_name` does not match the `Entit
- Settling a message that has been peeked will raise `ValueError`.
- Settling a message or renewing a lock on a message received in `RECEIVE_AND_DELETE` receive mode will raise `ValueError`.
- Setting `session_id`, `reply_to_session_id`, `message_id` and `partition_key` on `ServiceBusMessage` longer than 128 characters will raise `ValueError`.
* `ServiceBusReceiver.get_streaming_message_iter` has been made internal for the time being to assess use patterns before comitting to back-compatibility; messages may still be iterated over in equivelent fashion by iterating on the receiver itself.

**BugFixes**

Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter().
for msg in receiver: # ServiceBusReceiver instance is a generator.
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
```
Expand Down Expand Up @@ -387,7 +387,7 @@ There are various timeouts a user should be aware of within the library.
- 10 minute service side link closure: A link, once opened, will be closed after 10 minutes idle to protect the service against resource leakage. This should largely
be transparent to a user, but if you notice a reconnect occurring after such a duration, this is why. Performing any operations, including management operations, on the
link will extend this timeout.
- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()` or `get_streaming_message_iter()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length
- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length
a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.

> **NOTE:** If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling `receiver.renew_message_lock`/`receiver.session.renew_lock` manually, one can
Expand Down Expand Up @@ -505,7 +505,7 @@ contact [[email protected]](mailto:[email protected]) with any additio
[client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient
[send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages
[receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=next#azure.servicebus.ServiceBusReceiver.next
[session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages
[session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.ServiceBusMessage.session_id
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete_message#azure.servicebus.ServiceBusReceiver.complete_message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ def _iter_contextual_wrapper(self, max_wait_time=None):
break
finally:
if original_timeout:
self._handler._timeout = original_timeout
try:
self._handler._timeout = original_timeout
except AttributeError: # Handler may be disposed already.
pass


def _inner_next(self):
# We do this weird wrapping such that an imperitive next() call, and a generator-based iter both trace sanely.
Expand Down Expand Up @@ -484,7 +488,7 @@ def close(self):
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def get_streaming_message_iter(self, max_wait_time=None):
def _get_streaming_message_iter(self, max_wait_time=None):
# type: (Optional[float]) -> Iterator[ServiceBusReceivedMessage]
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
such a timeout occurs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ async def __anext__(self):
return message
finally:
if original_timeout:
self.receiver._handler._timeout = original_timeout
try:
self.receiver._handler._timeout = original_timeout
except AttributeError: # Handler may be disposed already.
pass


def __aiter__(self):
return self._IterContextualWrapper(self)
Expand Down Expand Up @@ -478,7 +482,7 @@ async def close(self) -> None:
await super(ServiceBusReceiver, self).close()
self._message_iter = None

def get_streaming_message_iter(
def _get_streaming_message_iter(
self,
max_wait_time: Optional[float] = None
) -> AsyncIterator[ServiceBusReceivedMessage]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ async def example_send_and_receive_async():
servicebus_receiver = await example_create_servicebus_receiver_async()
# [START receive_forever_async]
async with servicebus_receiver:
async for message in servicebus_receiver.get_streaming_message_iter():
async for message in servicebus_receiver:
print(str(message))
await servicebus_receiver.complete_message(message)
# [END receive_forever_async]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def example_send_and_receive_sync():
servicebus_receiver = example_create_servicebus_receiver_sync()
# [START receive_forever]
with servicebus_receiver:
for message in servicebus_receiver.get_streaming_message_iter():
for message in servicebus_receiver:
print(str(message))
servicebus_receiver.complete_message(message)
# [END receive_forever]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
await receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
await receiver.get_streaming_message_iter(max_wait_time=0)
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
Expand Down Expand Up @@ -1142,7 +1142,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
await sender.send_messages([message_a, message_b])

received_messages = []
async for message in receiver.get_streaming_message_iter(max_wait_time=5):
async for message in receiver._get_streaming_message_iter(max_wait_time=5):
received_messages.append(message)
await receiver.complete_message(message)

Expand Down Expand Up @@ -1451,11 +1451,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver:

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
break

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)

for message in messages:
Expand All @@ -1469,9 +1469,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa
message_3 = ServiceBusMessage("3")
await sender.send_messages([message_2, message_3])

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -1536,19 +1536,19 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = receiver._handler._counter.get_current_ms()
async for message in receiver.get_streaming_message_iter(max_wait_time=10):
async for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
await receiver.complete_message(message)

time_2 = receiver._handler._counter.get_current_ms()
async for message in receiver.get_streaming_message_iter(max_wait_time=1):
async for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)

async for message in receiver.get_streaming_message_iter(max_wait_time=3):
async for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)
Expand All @@ -1558,7 +1558,7 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
time_6 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self,
await receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
await receiver.get_streaming_message_iter(max_wait_time=0)
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ def _receive(self, receiver, end_time):
if self.receive_type == ReceiveType.pull:
batch = receiver.receive_messages(max_message_count=self.max_message_count, max_wait_time=self.max_wait_time)
elif self.receive_type == ReceiveType.push:
batch = receiver.get_streaming_message_iter(max_wait_time=self.max_wait_time)
batch = receiver._get_streaming_message_iter(max_wait_time=self.max_wait_time)
else:
batch = []

for message in batch:
self.on_receive(self._state, message, receiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s

stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE)],
should_complete_messages = False,
duration=timedelta(seconds=60))

result = stress_test.run()
Expand Down Expand Up @@ -186,8 +187,8 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection
class LongRenewStressTestRunner(StressTestRunner):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(received_message, timeout=300)
renewer = AutoLockRenewer()
renewer.register(receiver, received_message, max_lock_renewal_duration=300)
time.sleep(300)

@pytest.mark.liveTest
Expand Down Expand Up @@ -216,7 +217,7 @@ def on_receive(self, state, received_message, receiver):
renewer = AutoLockRenewer()
def on_fail(renewable, error):
print("FAILED AUTOLOCKRENEW: " + str(error))
renewer.register(receiver.session, timeout=600, on_lock_renew_failure=on_fail)
renewer.register(receiver, receiver.session, max_lock_renewal_duration=600, on_lock_renew_failure=on_fail)

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down Expand Up @@ -282,6 +283,7 @@ def on_send(self, state, sent_message, sender):

@pytest.mark.liveTest
@pytest.mark.live_test_only
@pytest.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled')
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
Expand All @@ -299,3 +301,52 @@ def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_str
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

# This test validates that all individual messages are received contiguously over a long time period.
# (e.g. not dropped for whatever reason, not sent, or not received)
class DroppedMessageCheckerStressTestRunner(StressTestRunner):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
last_seen = getattr(state, 'last_seen', -1)
noncontiguous = getattr(state, 'noncontiguous', set())
body = int(str(received_message))
if body == last_seen+1:
last_seen += 1
if noncontiguous:
while (last_seen+1) in noncontiguous:
last_seen += 1
noncontiguous.remove(last_seen)
else:
noncontiguous.add(body)
state.noncontiguous = noncontiguous
state.last_seen = last_seen

def pre_process_message_body(self, payload):
'''Called when constructing message body'''
try:
body = self._message_id
except:
self._message_id = 0
body = 0
self._message_id += 1

return str(body)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_check_for_dropped_messages(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.DroppedMessageCheckerStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
receive_type=ReceiveType.pull,
duration=timedelta(seconds=3000))

result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)
Loading

0 comments on commit 1810441

Please sign in to comment.