From 1f358d07d64bf7061f87daf919cc05a3e0bfa6ac Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri <swathip@microsoft.com> Date: Tue, 4 Jan 2022 09:06:49 -0800 Subject: [PATCH 1/3] inital fixed retry commit --- .../azure/servicebus/__init__.py | 2 + .../azure/servicebus/_base_handler.py | 14 +++++- .../servicebus/_common/_configuration.py | 2 + .../azure/servicebus/_retry.py | 11 +++++ .../azure/servicebus/_servicebus_client.py | 9 ++++ .../servicebus/aio/_base_handler_async.py | 9 +++- .../aio/_servicebus_client_async.py | 9 ++++ .../tests/async_tests/test_sb_client_async.py | 42 ++++++++++++++++++ .../azure-servicebus/tests/test_sb_client.py | 43 ++++++++++++++++++- 9 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py index 3af017075f1d..2a9b58694dc2 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py @@ -28,6 +28,7 @@ parse_connection_string, ServiceBusConnectionStringProperties, ) +from ._retry import RetryMode TransportType = constants.TransportType @@ -46,4 +47,5 @@ "AutoLockRenewer", "parse_connection_string", "ServiceBusConnectionStringProperties", + "RetryMode", ] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index 0f866a2ea241..68c4671b601a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -22,6 +22,7 @@ from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential from ._common._configuration import Configuration +from ._retry import RetryMode from .exceptions import ( ServiceBusError, ServiceBusConnectionError, @@ -152,6 +153,12 @@ def _generate_sas_token(uri, policy, key, expiry=None): token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry) return AccessToken(token=token, expires_on=abs_expiry) +def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times): + if retry_mode == RetryMode.FIXED: + backoff_value = backoff_factor + else: + backoff_value = backoff_factor * (2 ** retried_times) + return min(backoff_max, backoff_value) class ServiceBusSASTokenCredential(object): """The shared access token credential used for authentication. @@ -418,7 +425,12 @@ def _backoff( ): # type: (int, Exception, Optional[float], str) -> None entity_name = entity_name or self._container_id - backoff = self._config.retry_backoff_factor * 2 ** retried_times + backoff = _get_backoff_time( + self._config.retry_mode, + self._config.retry_backoff_factor, + self._config.retry_backoff_max, + retried_times, + ) if backoff <= self._config.retry_backoff_max and ( abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time ): # pylint:disable=no-else-return diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py index d9328c549a74..c16a7fd2cc52 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py @@ -5,12 +5,14 @@ from typing import Optional, Dict, Any from uamqp.constants import TransportType +from .._retry import RetryMode class Configuration(object): # pylint:disable=too-many-instance-attributes def __init__(self, **kwargs): self.user_agent = kwargs.get("user_agent") # type: Optional[str] self.retry_total = kwargs.get("retry_total", 3) # type: int + self.retry_mode = kwargs.get("retry_mode", RetryMode.EXPONENTIAL) self.retry_backoff_factor = kwargs.get( "retry_backoff_factor", 0.8 ) # type: float diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py new file mode 100644 index 000000000000..1eb87a264398 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py @@ -0,0 +1,11 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +from typing import Optional, Dict, Any + +from enum import Enum + +class RetryMode(str, Enum): + EXPONENTIAL = 'exponential' + FIXED = 'fixed' diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 28db78a31cc9..1b0f28cc1aa3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -60,6 +60,8 @@ class ServiceBusClient(object): :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. Default value is 0.8. :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :keyword retry_mode: Fixed or exponential delay between attempts, default is exponential. + :paramtype retry_mode: ~azure.servicebus.RetryMode .. admonition:: Example: @@ -152,6 +154,8 @@ def from_connection_string(cls, conn_str, **kwargs): :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. Default value is 0.8. :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :keyword retry_mode: Fixed or exponential delay between attempts, default is exponential. + :paramtype retry_mode: ~azure.servicebus.RetryMode :rtype: ~azure.servicebus.ServiceBusClient .. admonition:: Example: @@ -212,6 +216,7 @@ def get_queue_sender(self, queue_name, **kwargs): http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -303,6 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs): http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -344,6 +350,7 @@ def get_topic_sender(self, topic_name, **kwargs): http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -433,6 +440,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -453,6 +461,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index b6614fa74632..f27a5680fb7d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -14,7 +14,7 @@ from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential -from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync +from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync, _get_backoff_time from .._common._configuration import Configuration from .._common.utils import create_properties, strip_protocol_from_uri, parse_sas_credential from .._common.constants import ( @@ -268,7 +268,12 @@ async def _backoff( self, retried_times, last_exception, abs_timeout_time=None, entity_name=None ): entity_name = entity_name or self._container_id - backoff = self._config.retry_backoff_factor * 2 ** retried_times + backoff = _get_backoff_time( + self._config.retry_mode, + self._config.retry_backoff_factor, + self._config.retry_backoff_max, + retried_times, + ) if backoff <= self._config.retry_backoff_max and ( abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time ): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 2f7f76c338e0..3ba40cdf2e35 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -58,6 +58,8 @@ class ServiceBusClient(object): :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. Default value is 0.8. :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :keyword retry_mode: Fixed or exponential delay between attempts, default is exponential. + :paramtype retry_mode: ~azure.eventhub.RetryMode .. admonition:: Example: @@ -129,6 +131,8 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClie :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. Default value is 0.8. :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :keyword retry_mode: Fixed or exponential delay between attempts, default is exponential. + :paramtype retry_mode: ~azure.eventhub.RetryMode :rtype: ~azure.servicebus.aio.ServiceBusClient .. admonition:: Example: @@ -209,6 +213,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender: http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -298,6 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -338,6 +344,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -428,6 +435,7 @@ def get_subscription_receiver( http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, @@ -448,6 +456,7 @@ def get_subscription_receiver( http_proxy=self._config.http_proxy, connection=self._connection, user_agent=self._config.user_agent, + retry_mode=self._config.retry_mode, retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py index ad294793755e..1f8ddd8ffd8e 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py @@ -6,6 +6,7 @@ import logging +import time import pytest from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential @@ -401,3 +402,44 @@ async def test_client_named_key_credential_async(self, async with client: async with client.get_queue_sender(servicebus_queue.name) as sender: await sender.send_messages(ServiceBusMessage("foo")) + + async def test_backoff_fixed_retry(self): + client = ServiceBusClient( + 'fake.host.com', + 'fake_eh', + retry_mode=RetryMode.FIXED + ) + # queue sender + sender = await client.get_queue_sender('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + # exp = 0.8 * (2 ** 1) = 1.6 + # time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL + # check that fixed is less than 'exp' + assert sleep_time_fixed < backoff * (2 ** 1) + + # topic sender + sender = await client.get_topic_sender('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) + + # queue receiver + receiver = await client.get_queue_receiver('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) + + # subscription receiver + receiver = await client.get_subscription_receiver('fake_topic', 'fake_sub') + backoff = client._config.retry_backoff_factor + start_time = time.time() + receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index b1d09ed30189..404d44f0f4be 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -14,7 +14,7 @@ from azure.common import AzureHttpError, AzureConflictHttpError from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential from azure.mgmt.servicebus.models import AccessRights -from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver +from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver, RetryMode from azure.servicebus._base_handler import ServiceBusSharedKeyCredential from azure.servicebus._common.message import ServiceBusMessage, ServiceBusReceivedMessage from azure.servicebus.exceptions import ( @@ -419,3 +419,44 @@ def test_client_azure_named_key_credential(self, with client: with client.get_queue_sender(servicebus_queue.name) as sender: sender.send_messages(ServiceBusMessage("foo")) + + def test_backoff_fixed_retry(self): + client = ServiceBusClient( + 'fake.host.com', + 'fake_eh', + retry_mode=RetryMode.FIXED + ) + # queue sender + sender = client.get_queue_sender('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + # exp = 0.8 * (2 ** 1) = 1.6 + # time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL + # check that fixed is less than 'exp' + assert sleep_time_fixed < backoff * (2 ** 1) + + # topic sender + sender = client.get_topic_sender('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) + + # queue receiver + receiver = client.get_queue_receiver('fake_name') + backoff = client._config.retry_backoff_factor + start_time = time.time() + receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) + + # subscription receiver + receiver = client.get_subscription_receiver('fake_topic', 'fake_sub') + backoff = client._config.retry_backoff_factor + start_time = time.time() + receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None) + sleep_time_fixed = time.time() - start_time + assert sleep_time_fixed < backoff * (2 ** 1) From d6f5462dcbce29144ea620f1e6157515b6258faa Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri <swathip@microsoft.com> Date: Tue, 4 Jan 2022 10:22:33 -0800 Subject: [PATCH 2/3] changelog + version --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 2 +- sdk/eventhub/azure-eventhub/azure/eventhub/_version.py | 2 +- sdk/servicebus/azure-servicebus/CHANGELOG.md | 6 +++++- .../azure-servicebus/azure/servicebus/_version.py | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index a7d4e0fb075e..f4a98400ff80 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 5.6.2 (Unreleased) +## 5.7.0 (Unreleased) ### Features Added diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py index 87024b3843d4..01652f1b086e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "5.6.2" +VERSION = "5.7.0" diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index ed6a3bc8a0d2..015052edc997 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,9 +1,13 @@ # Release History -## 7.4.1 (Unreleased) +## 7.5.0 (Unreleased) ### Features Added +- Added support for fixed (linear) retry backoff: + - Sync/async `ServiceBusClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument. + - `RetryMode` enum has been added to `azure.servicebus`, with values `FIXED` and `EXPONENTIAL`. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py index e54d5f449d7b..9aaaffc886a6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "7.4.1" +VERSION = "7.5.0" From f118767ffded327ecdb9cde2a43b6cb567ad58de Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri <swathip@microsoft.com> Date: Tue, 4 Jan 2022 11:18:14 -0800 Subject: [PATCH 3/3] pylint --- sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py index 1eb87a264398..3f5f01395276 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py @@ -2,8 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Optional, Dict, Any - from enum import Enum class RetryMode(str, Enum):