diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 1573d1ce6640..01b64d916549 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,6 +2,8 @@ ## 7.0.0 (2020-11-23) +> **Note:** This is the GA release of the `azure-servicebus` package, rolling out the official API surface area constructed over the prior preview releases. Users migrating from `v0.50` are advised to view the [migration guide](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/migration_guide.md). + **New Features** * `sub_queue` and `receive_mode` may now be passed in as a valid string (as defined by their respective enum type) as well as their enum form when constructing `ServiceBusReceiver`. diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 64dcccd8b748..b6a7d2560595 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -13,12 +13,12 @@ Use the Service Bus client library for Python to communicate between application [Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/) | [Package (PyPi)][pypi] | [API reference documentation][api_docs] | [Product documentation][product_docs] | [Samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples) | [Changelog](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/CHANGELOG.md) -> **NOTE**: This document has instructions, links and code snippets for the **preview** of the next version of the `azure-servicebus` package -> which has different APIs than the current version (0.50). Please view the resources below for references on the existing library. +> **NOTE**: This document has instructions, references, and code snippets for the latest version of the `azure-servicebus` package. +> This package is replacing the preview version (`azure-servicebus==0.53`) which is no longer maintained. +> Please view the links below for reference if needed; however, migrating to the latest version using the [migration guide][migration_guide] is recommended. [V0.50 Source code][0_50_source] | [V0.50 Package (PyPi)][0_50_pypi] | [V0.50 API reference documentation][0_50_api_docs] | [V0.50 Product documentation][0_50_product_docs] | [V0.50 Samples][0_50_samples] | [V0.50 Changelog][0_50_changelog] -We also provide a migration guide for users familiar with the existing package that would like to try the preview: [migration guide to move from Service Bus V0.50 to Service Bus V7 Preview][migration_guide] ## Getting started @@ -241,7 +241,7 @@ When receiving from a queue, you have multiple actions you can take on the messa > **NOTE**: You can only settle `ServiceBusReceivedMessage` objects which are received in `ServiceBusReceiveMode.PEEK_LOCK` mode (this is the default). > `ServiceBusReceiveMode.RECEIVE_AND_DELETE` mode removes the message from the queue on receipt. `ServiceBusReceivedMessage` messages -> returned from `peek_messages()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation. +> returned from `peek_messages()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. If the message has a lock as mentioned above, settlement will fail if the message lock has expired. If processing would take longer than the lock duration, it must be maintained via `receiver.renew_message_lock` before it expires. @@ -370,7 +370,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: renewer.close() ``` -If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed. +If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed, or by having passed a callback to the `on_lock_renew_failure` parameter on renewer initialization. It would also manifest when trying to take action (such as completing a message) on the specified object. ## Troubleshooting diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index a0b64275aa00..39594c50d55c 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -47,9 +47,9 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/master/sd - [schedule_topic_messages_and_cancellation](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_topic_messages_and_cancellation.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_topic_messages_and_cancellation_async.py)) - Examples to schedule messages and cancel scheduled messages on a service bus topic: - Schedule a single message or multiple messages to a topic - Cancel scheduled messages from a topic -- [client_identity_authentication.py](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/client_identity_authentication.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/client_identity_authentication_async.py)) - Examples to authenticate the client by Azure Activate Directory +- [client_identity_authentication.py](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/client_identity_authentication.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/client_identity_authentication_async.py)) - Examples to authenticate the client by Azure Activate Directory: - Authenticate and create the client utilizing the `azure.identity` library -- [authenticate_client_connstr.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/sync_samples/authenticate_client_connstr.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/async_samples/authenticate_client_connstr_async.py)) - Examples to authenticate the client by Connection String +- [authenticate_client_connstr.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/sync_samples/authenticate_client_connstr.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/async_samples/authenticate_client_connstr_async.py)) - Examples to authenticate the client by Connection String: - Authenticate and create the client utilizing the connection string available in the Azure portal or via Azure CLI. - [proxy.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/sync_samples/proxy.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/async_samples/proxy_async.py)) - Examples to send message behind a proxy: - Send message behind a proxy @@ -57,33 +57,34 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/master/sd - Automatically renew lock on message received from non-sessionful entity - Automatically renew lock on the session of sessionful entity - Configure a callback to be triggered on auto lock renew failures. -- [mgmt_queue](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_queue.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace +- [mgmt_queue.py](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_queue.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace: - Create a queue - Delete a queue - Update a queue - List queues - Get queue properties - Get queue runtime information -- [mgmt_topic](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_topic.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_topic_async.py)) - Examples to manage topic entities under a given servicebus namespace +- [mgmt_topic](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_topic.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_topic_async.py)) - Examples to manage topic entities under a given servicebus namespace: - Create a topic - Delete a topic - Update a topic - List topic - Get topic properties - Get topic runtime information -- [mgmt_subscription](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_subscription.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_subscription_async.py)) - Examples to manage subscription entities under a given servicebus namespace +- [mgmt_subscription](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_subscription.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_subscription_async.py)) - Examples to manage subscription entities under a given servicebus namespace: - Create a subscription - Delete a subscription - Update a subscription - List subscription - Get subscription properties - Get subscription runtime information -- [mgmt_rule](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_rule.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_rule_async.py)) - Examples to manage rule entities under a given servicebus subscription +- [mgmt_rule](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/mgmt_rule.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/async_samples/mgmt_rule_async.py)) - Examples to manage rule entities under a given servicebus subscription: - Create a rule - Delete a rule - Update a rule - List rule - Get rule properties +- [failure_and_recovery.py](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/failure_and_recovery.py) - A demonstration of potential failure modes from an end-to-end send receive flow, as well as possible recovery patterns. ## Prerequisites - Python 2.7, 3.5 or later. diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/failure_and_recovery.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/failure_and_recovery.py new file mode 100644 index 000000000000..f0a1c2e80a6f --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/failure_and_recovery.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An (overly) verbose sample demonstrating possible failure modes and potential recovery patterns. + +Many of these catches are present for illustrative or duplicate purposes, and could be condensed or elided +in a production scenario depending on the system design. +""" + +# pylint: disable=C0111 + +import os +from azure.servicebus import ServiceBusClient, ServiceBusMessage +from azure.servicebus.exceptions import ( + MessageSizeExceededError, + ServiceBusConnectionError, + ServiceBusAuthorizationError, + ServiceBusAuthenticationError, + OperationTimeoutError, + ServiceBusError, + ServiceBusCommunicationError, + MessageAlreadySettled, + MessageLockLostError, + MessageNotFoundError +) + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +def send_batch_messages(sender): + batch_message = sender.create_message_batch() + for i in range(10): + try: + message = ServiceBusMessage("Data {}".format(i)) + except TypeError: + # Message body is of an inappropriate type, must be string, bytes or None. + continue + try: + batch_message.add_message(message) + except MessageSizeExceededError: + # ServiceBusMessageBatch object reaches max_size. + # New ServiceBusMessageBatch object can be created here to send more data. + # This must be handled at the application layer, by breaking up or condensing. + continue + last_error = None + for _ in range(3): # Send retries + try: + sender.send_messages(batch_message) + return + except OperationTimeoutError: + # send has timed out, retry. + continue + except MessageSizeExceededError: + # The body provided in the message to be sent is too large. + # This must be handled at the application layer, by breaking up or condensing. + raise + except ServiceBusError as e: + # Other types of service bus errors that can be handled at the higher level, such as connection/auth errors + # If it happens persistently, should bubble up, and should be logged/alerted if high volume. + last_error = e + continue + if last_error: + raise last_error + +def receive_messages(receiver): + should_retry = True + while should_retry: + try: + for msg in receiver: + try: + # Do your application-specific data processing here + print(str(msg)) + should_complete = True + except Exception as e: + should_complete = False + + for _ in range(3): # Settlement retry + try: + if should_complete: + receiver.complete_message(msg) + else: + receiver.abandon_message(msg) + # Depending on the desired behavior, one could dead letter on failure instead; failure modes are comparable. + # Abandon returns the message to the queue for another consumer to receive, dead letter moves to the dead letter subqueue. + # + # receiver.dead_letter_message(msg, reason=str(e), error_description="Application level failure") + break + except MessageAlreadySettled: + # Message was already settled, either somewhere earlier in this processing or by another node. Continue. + break + except MessageLockLostError: + # Message lock was lost before settlement. Handle as necessary in the app layer for idempotency then continue on. + break + except MessageNotFoundError: + # Message has an improper sequence number, was dead lettered, or otherwise does not exist. Handle at app layer, continue on. + break + except ServiceBusError: + # Any other undefined service errors during settlement. Can be transient, and can retry, but should be logged, and alerted on high volume. + continue + return + except ServiceBusAuthorizationError: + # Permission based errors should be bubbled up. + raise + except: + # Although miscellaneous service errors and interruptions can occasionally occur during receiving, + # In most pragmatic cases one can try to continue receiving unless the failure mode seens persistent. + # Logging the associated failure and alerting on high volume is often prudent. + continue + + + +def send_and_receive_defensively(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) + + for _ in range(3): # Connection retries. + try: + print("Opening") + with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + try: + with sender: + print("Sending") + send_batch_messages(sender) + except ValueError: + # Handler was shut down previously. (Cannot happen in this example, shown for completeness.) + pass + + receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME) + try: + with receiver: + print("Receiving") + receive_messages(receiver) + except ValueError: + # Handler was shut down previously. (Cannot happen in this example, shown for completeness.) + pass + + return + except ServiceBusConnectionError: + # An error occurred in the connection to the service. + # This may have been caused by a transient network issue or service problem. It is recommended to retry. + continue + except ServiceBusAuthorizationError: + # An error occurred when authorizing the connection to the service. + # This may have been caused by the credentials not having the right permission to perform the operation. + # It is recommended to check the permission of the credentials. + raise + except ServiceBusAuthenticationError: + # An error occurred when authenticate the connection to the service. + # This may have been caused by the credentials being incorrect. It is recommended to check the credentials. + raise + except ServiceBusCommunicationError: + # Unable to communicate with the specified servicebus. Ensure that the FQDN is correct, + # and that there is no firewall or network issue preventing connectivity. + raise + +send_and_receive_defensively() +print("Send and Receive is done.") diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py index 5fd628d458be..26af3c59ce1f 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py @@ -283,7 +283,7 @@ def on_send(self, state, sent_message, sender): @pytest.mark.liveTest @pytest.mark.live_test_only - @pytest.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled') + @pytest.mark.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled') @CachedResourceGroupPreparer(name_prefix='servicebustest') @ServiceBusNamespacePreparer(name_prefix='servicebustest') @ServiceBusQueuePreparer(name_prefix='servicebustest')