From 18104416096b7d2147afa4a98e7d9cc3f9ee0ab5 Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee <kibrantn@microsoft.com> Date: Fri, 20 Nov 2020 20:50:56 -0800 Subject: [PATCH] [ServiceBus] Pre-GA Bug Bash fixes (#15489) - Small fix found during bug bash, an already disposed handler during iter finalizer causes issues on iter termination. - Given the risks the above exposed, made the explicit getter which necessitated the iter wrapper voodoo internal so as to give time to ensure we're not locking ourselves into a dangerous pattern. This was simply a helper and the same functionality can still be accessed by iterating on the base receiver. - Adjusts docs, samples, tests. - clarify one changelog point (Thanks for noting Yunhaoling) - Fixes up some stress tests that had been missed in prior refactors and ensure all green. - Tick version to 7.0.0 so changelog validation doesn't get angry. - Clarify changelog, add note to stress test purpose. Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com> --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 3 +- sdk/servicebus/azure-servicebus/README.md | 6 +- .../azure/servicebus/_servicebus_receiver.py | 8 ++- .../aio/_servicebus_receiver_async.py | 8 ++- .../sample_code_servicebus_async.py | 2 +- .../sync_samples/sample_code_servicebus.py | 2 +- .../tests/async_tests/test_queues_async.py | 20 +++---- .../async_tests/test_subscriptions_async.py | 2 +- .../tests/stress_tests/stress_test_base.py | 4 +- .../tests/stress_tests/test_stress_queues.py | 57 ++++++++++++++++++- .../azure-servicebus/tests/test_queues.py | 24 ++++---- .../tests/test_subscriptions.py | 2 +- 12 files changed, 100 insertions(+), 38 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 52011b2a0435..a28f14796420 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -16,7 +16,7 @@ * An improper `receive_mode` value will now raise `ValueError` instead of `TypeError` in line with supporting extensible enums. * Rename enum values `DeadLetter` to `DEAD_LETTER`, `TransferDeadLetter` to `TRANSFER_DEAD_LETTER`, `PeekLock` to `PEEK_LOCK` and `ReceiveAndDelete` to `RECEIVE_AND_DELETE` to conform to sdk guidelines going forward. * `send_messages`, `schedule_messages`, `cancel_scheduled_messages` and `receive_deferred_messages` now performs a no-op rather than raising a `ValueError` if provided an empty list of messages or an empty batch. -* `amqp_annotated_message` has been renamed to `raw_amqp_message` to normalize with other SDKs. +* `ServiceBusMessage.amqp_annotated_message` has been renamed to `ServiceBusMessage.raw_amqp_message` to normalize with other SDKs. * Redesigned error hierarchy based on the service-defined error condition: - `MessageAlreadySettled` now inherits from `ValueError` instead of `ServiceBusMessageError` as it's a client-side validation. - Removed `NoActiveSession` which is now replaced by `OperationTimeoutError` as the client times out when trying to connect to any available session. @@ -41,6 +41,7 @@ raise `ValueError` if the `queue_name` or `topic_name` does not match the `Entit - Settling a message that has been peeked will raise `ValueError`. - Settling a message or renewing a lock on a message received in `RECEIVE_AND_DELETE` receive mode will raise `ValueError`. - Setting `session_id`, `reply_to_session_id`, `message_id` and `partition_key` on `ServiceBusMessage` longer than 128 characters will raise `ValueError`. +* `ServiceBusReceiver.get_streaming_message_iter` has been made internal for the time being to assess use patterns before comitting to back-compatibility; messages may still be iterated over in equivelent fashion by iterating on the receiver itself. **BugFixes** diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 1bae7d78f214..0f375bd077c1 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -146,7 +146,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt. # Default is None; to receive forever. with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver: - for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter(). + for msg in receiver: # ServiceBusReceiver instance is a generator. print(str(msg)) # If it is desired to halt receiving early, one can break out of the loop here safely. ``` @@ -387,7 +387,7 @@ There are various timeouts a user should be aware of within the library. - 10 minute service side link closure: A link, once opened, will be closed after 10 minutes idle to protect the service against resource leakage. This should largely be transparent to a user, but if you notice a reconnect occurring after such a duration, this is why. Performing any operations, including management operations, on the link will extend this timeout. -- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()` or `get_streaming_message_iter()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length +- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken. > **NOTE:** If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling `receiver.renew_message_lock`/`receiver.session.renew_lock` manually, one can @@ -505,7 +505,7 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio [client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient [send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages [receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages -[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter +[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=next#azure.servicebus.ServiceBusReceiver.next [session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages [session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.ServiceBusMessage.session_id [complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete_message#azure.servicebus.ServiceBusReceiver.complete_message diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 0d2a71316a0b..a8cbfcc3c7ab 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -170,7 +170,11 @@ def _iter_contextual_wrapper(self, max_wait_time=None): break finally: if original_timeout: - self._handler._timeout = original_timeout + try: + self._handler._timeout = original_timeout + except AttributeError: # Handler may be disposed already. + pass + def _inner_next(self): # We do this weird wrapping such that an imperitive next() call, and a generator-based iter both trace sanely. @@ -484,7 +488,7 @@ def close(self): super(ServiceBusReceiver, self).close() self._message_iter = None # pylint: disable=attribute-defined-outside-init - def get_streaming_message_iter(self, max_wait_time=None): + def _get_streaming_message_iter(self, max_wait_time=None): # type: (Optional[float]) -> Iterator[ServiceBusReceivedMessage] """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until such a timeout occurs. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 981bf29e965a..51333aecf0f1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -167,7 +167,11 @@ async def __anext__(self): return message finally: if original_timeout: - self.receiver._handler._timeout = original_timeout + try: + self.receiver._handler._timeout = original_timeout + except AttributeError: # Handler may be disposed already. + pass + def __aiter__(self): return self._IterContextualWrapper(self) @@ -478,7 +482,7 @@ async def close(self) -> None: await super(ServiceBusReceiver, self).close() self._message_iter = None - def get_streaming_message_iter( + def _get_streaming_message_iter( self, max_wait_time: Optional[float] = None ) -> AsyncIterator[ServiceBusReceivedMessage]: diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 8a2b4d45d5e7..495712052c90 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -192,7 +192,7 @@ async def example_send_and_receive_async(): servicebus_receiver = await example_create_servicebus_receiver_async() # [START receive_forever_async] async with servicebus_receiver: - async for message in servicebus_receiver.get_streaming_message_iter(): + async for message in servicebus_receiver: print(str(message)) await servicebus_receiver.complete_message(message) # [END receive_forever_async] diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 13f102592863..7ef9de246acd 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -255,7 +255,7 @@ def example_send_and_receive_sync(): servicebus_receiver = example_create_servicebus_receiver_sync() # [START receive_forever] with servicebus_receiver: - for message in servicebus_receiver.get_streaming_message_iter(): + for message in servicebus_receiver: print(str(message)) servicebus_receiver.complete_message(message) # [END receive_forever] diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 96168f1e7d7a..36545e7ede80 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -95,7 +95,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel await receiver.receive_messages(max_wait_time=0) with pytest.raises(ValueError): - await receiver.get_streaming_message_iter(max_wait_time=0) + await receiver._get_streaming_message_iter(max_wait_time=0) count = 0 async for message in receiver: @@ -1142,7 +1142,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace await sender.send_messages([message_a, message_b]) received_messages = [] - async for message in receiver.get_streaming_message_iter(max_wait_time=5): + async for message in receiver._get_streaming_message_iter(max_wait_time=5): received_messages.append(message) await receiver.complete_message(message) @@ -1451,11 +1451,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa messages = [] async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver: - async for message in receiver.get_streaming_message_iter(): + async for message in receiver._get_streaming_message_iter(): messages.append(message) break - async for message in receiver.get_streaming_message_iter(): + async for message in receiver._get_streaming_message_iter(): messages.append(message) for message in messages: @@ -1469,9 +1469,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa message_3 = ServiceBusMessage("3") await sender.send_messages([message_2, message_3]) - async for message in receiver.get_streaming_message_iter(): + async for message in receiver._get_streaming_message_iter(): messages.append(message) - async for message in receiver.get_streaming_message_iter(): + async for message in receiver._get_streaming_message_iter(): messages.append(message) assert len(messages) == 4 @@ -1536,19 +1536,19 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: time_1 = receiver._handler._counter.get_current_ms() - async for message in receiver.get_streaming_message_iter(max_wait_time=10): + async for message in receiver._get_streaming_message_iter(max_wait_time=10): messages.append(message) await receiver.complete_message(message) time_2 = receiver._handler._counter.get_current_ms() - async for message in receiver.get_streaming_message_iter(max_wait_time=1): + async for message in receiver._get_streaming_message_iter(max_wait_time=1): messages.append(message) time_3 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2) time_4 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11) - async for message in receiver.get_streaming_message_iter(max_wait_time=3): + async for message in receiver._get_streaming_message_iter(max_wait_time=3): messages.append(message) time_5 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4) @@ -1558,7 +1558,7 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi time_6 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6) - async for message in receiver.get_streaming_message_iter(): + async for message in receiver._get_streaming_message_iter(): messages.append(message) time_7 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py index 53d02a225a70..bff7befafe12 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py @@ -64,7 +64,7 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self, await receiver.receive_messages(max_wait_time=-1) with pytest.raises(ValueError): - await receiver.get_streaming_message_iter(max_wait_time=0) + await receiver._get_streaming_message_iter(max_wait_time=0) count = 0 async for message in receiver: diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py index 52f554ffd696..67700a165d2b 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py @@ -207,7 +207,9 @@ def _receive(self, receiver, end_time): if self.receive_type == ReceiveType.pull: batch = receiver.receive_messages(max_message_count=self.max_message_count, max_wait_time=self.max_wait_time) elif self.receive_type == ReceiveType.push: - batch = receiver.get_streaming_message_iter(max_wait_time=self.max_wait_time) + batch = receiver._get_streaming_message_iter(max_wait_time=self.max_wait_time) + else: + batch = [] for message in batch: self.on_receive(self._state, message, receiver) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py index 2aef68475f21..5fd628d458be 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py @@ -108,6 +108,7 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)], receivers = [sb_client.get_queue_receiver(servicebus_queue.name, receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE)], + should_complete_messages = False, duration=timedelta(seconds=60)) result = stress_test.run() @@ -186,8 +187,8 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection class LongRenewStressTestRunner(StressTestRunner): def on_receive(self, state, received_message, receiver): '''Called on every successful receive''' - renewer = AutoLockRenew() - renewer.register(received_message, timeout=300) + renewer = AutoLockRenewer() + renewer.register(receiver, received_message, max_lock_renewal_duration=300) time.sleep(300) @pytest.mark.liveTest @@ -216,7 +217,7 @@ def on_receive(self, state, received_message, receiver): renewer = AutoLockRenewer() def on_fail(renewable, error): print("FAILED AUTOLOCKRENEW: " + str(error)) - renewer.register(receiver.session, timeout=600, on_lock_renew_failure=on_fail) + renewer.register(receiver, receiver.session, max_lock_renewal_duration=600, on_lock_renew_failure=on_fail) @pytest.mark.liveTest @pytest.mark.live_test_only @@ -282,6 +283,7 @@ def on_send(self, state, sent_message, sender): @pytest.mark.liveTest @pytest.mark.live_test_only + @pytest.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled') @CachedResourceGroupPreparer(name_prefix='servicebustest') @ServiceBusNamespacePreparer(name_prefix='servicebustest') @ServiceBusQueuePreparer(name_prefix='servicebustest') @@ -299,3 +301,52 @@ def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_str result = stress_test.run() assert(result.total_sent > 0) assert(result.total_received > 0) + + # This test validates that all individual messages are received contiguously over a long time period. + # (e.g. not dropped for whatever reason, not sent, or not received) + class DroppedMessageCheckerStressTestRunner(StressTestRunner): + def on_receive(self, state, received_message, receiver): + '''Called on every successful receive''' + last_seen = getattr(state, 'last_seen', -1) + noncontiguous = getattr(state, 'noncontiguous', set()) + body = int(str(received_message)) + if body == last_seen+1: + last_seen += 1 + if noncontiguous: + while (last_seen+1) in noncontiguous: + last_seen += 1 + noncontiguous.remove(last_seen) + else: + noncontiguous.add(body) + state.noncontiguous = noncontiguous + state.last_seen = last_seen + + def pre_process_message_body(self, payload): + '''Called when constructing message body''' + try: + body = self._message_id + except: + self._message_id = 0 + body = 0 + self._message_id += 1 + + return str(body) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @ServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest') + def test_stress_queue_check_for_dropped_messages(self, servicebus_namespace_connection_string, servicebus_queue): + sb_client = ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, debug=False) + + stress_test = ServiceBusQueueStressTests.DroppedMessageCheckerStressTestRunner( + senders = [sb_client.get_queue_sender(servicebus_queue.name)], + receivers = [sb_client.get_queue_receiver(servicebus_queue.name)], + receive_type=ReceiveType.pull, + duration=timedelta(seconds=3000)) + + result = stress_test.run() + assert(result.total_sent > 0) + assert(result.total_received > 0) diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 5875c412f3e0..5229dd26c0f2 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -157,7 +157,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu receiver.receive_messages(max_wait_time=0) with pytest.raises(ValueError): - receiver.get_streaming_message_iter(max_wait_time=0) + receiver._get_streaming_message_iter(max_wait_time=0) count = 0 for message in receiver: @@ -1369,7 +1369,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ sender.send_messages(message_arry) received_messages = [] - for message in receiver.get_streaming_message_iter(max_wait_time=5): + for message in receiver._get_streaming_message_iter(max_wait_time=5): received_messages.append(message) receiver.complete_message(message) @@ -1711,7 +1711,7 @@ def message_content(): message_2nd_received_cnt = 0 while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20: messages = [] - for message in receiver.get_streaming_message_iter(max_wait_time=5): + for message in receiver._get_streaming_message_iter(max_wait_time=5): messages.append(message) if not messages: break @@ -1758,11 +1758,11 @@ def test_queue_receiver_alive_after_timeout(self, servicebus_namespace_connectio messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) break - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) for message in messages: @@ -1776,9 +1776,9 @@ def test_queue_receiver_alive_after_timeout(self, servicebus_namespace_connectio message_3 = ServiceBusMessage("3") sender.send_messages([message_2, message_3]) - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) assert len(messages) == 4 @@ -1848,7 +1848,7 @@ def test_queue_receiver_sender_resume_after_link_timeout(self, servicebus_namesp messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) assert len(messages) == 2 @@ -1872,19 +1872,19 @@ def test_queue_receiver_respects_max_wait_time_overrides(self, servicebus_namesp time_1 = receiver._handler._counter.get_current_ms() time_3 = time_1 # In case inner loop isn't hit, fail sanely. - for message in receiver.get_streaming_message_iter(max_wait_time=10): + for message in receiver._get_streaming_message_iter(max_wait_time=10): messages.append(message) receiver.complete_message(message) time_2 = receiver._handler._counter.get_current_ms() - for message in receiver.get_streaming_message_iter(max_wait_time=1): + for message in receiver._get_streaming_message_iter(max_wait_time=1): messages.append(message) time_3 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2) time_4 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11) - for message in receiver.get_streaming_message_iter(max_wait_time=3): + for message in receiver._get_streaming_message_iter(max_wait_time=3): messages.append(message) time_5 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4) @@ -1894,7 +1894,7 @@ def test_queue_receiver_respects_max_wait_time_overrides(self, servicebus_namesp time_6 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6) - for message in receiver.get_streaming_message_iter(): + for message in receiver._get_streaming_message_iter(): messages.append(message) time_7 = receiver._handler._counter.get_current_ms() assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6) diff --git a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py index 159b409f2f76..a32c6b5a5a77 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py @@ -63,7 +63,7 @@ def test_subscription_by_subscription_client_conn_str_receive_basic(self, servic receiver.receive_messages(max_wait_time=-1) with pytest.raises(ValueError): - receiver.get_streaming_message_iter(max_wait_time=0) + receiver._get_streaming_message_iter(max_wait_time=0) count = 0 for message in receiver: