Skip to content

Commit

Permalink
[ServiceBus] fixed retry backoff (#22317)
Browse files Browse the repository at this point in the history
* inital fixed retry commit

* changelog + version

* pylint
  • Loading branch information
swathipil authored Jan 5, 2022
1 parent bc99c22 commit f68dd95
Showing 13 changed files with 143 additions and 8 deletions.
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 5.6.2 (Unreleased)
## 5.7.0 (Unreleased)

### Features Added

2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_version.py
Original file line number Diff line number Diff line change
@@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "5.6.2"
VERSION = "5.7.0"
6 changes: 5 additions & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Release History

## 7.4.1 (Unreleased)
## 7.5.0 (Unreleased)

### Features Added

- Added support for fixed (linear) retry backoff:
- Sync/async `ServiceBusClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
- `RetryMode` enum has been added to `azure.servicebus`, with values `FIXED` and `EXPONENTIAL`.

### Breaking Changes

### Bugs Fixed
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
parse_connection_string,
ServiceBusConnectionStringProperties,
)
from ._retry import RetryMode

TransportType = constants.TransportType

@@ -46,4 +47,5 @@
"AutoLockRenewer",
"parse_connection_string",
"ServiceBusConnectionStringProperties",
"RetryMode",
]
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from ._common._configuration import Configuration
from ._retry import RetryMode
from .exceptions import (
ServiceBusError,
ServiceBusConnectionError,
@@ -152,6 +153,12 @@ def _generate_sas_token(uri, policy, key, expiry=None):
token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
if retry_mode == RetryMode.FIXED:
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2 ** retried_times)
return min(backoff_max, backoff_value)

class ServiceBusSASTokenCredential(object):
"""The shared access token credential used for authentication.
@@ -418,7 +425,12 @@ def _backoff(
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
Original file line number Diff line number Diff line change
@@ -5,12 +5,14 @@
from typing import Optional, Dict, Any

from uamqp.constants import TransportType
from .._retry import RetryMode


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.retry_mode = kwargs.get("retry_mode", RetryMode.EXPONENTIAL)
self.retry_backoff_factor = kwargs.get(
"retry_backoff_factor", 0.8
) # type: float
9 changes: 9 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from enum import Enum

class RetryMode(str, Enum):
EXPONENTIAL = 'exponential'
FIXED = 'fixed'
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode
.. admonition:: Example:
@@ -152,6 +154,8 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode
:rtype: ~azure.servicebus.ServiceBusClient
.. admonition:: Example:
@@ -212,6 +216,7 @@ def get_queue_sender(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -303,6 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -344,6 +350,7 @@ def get_topic_sender(self, topic_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -433,6 +440,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -453,6 +461,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Original file line number Diff line number Diff line change
@@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "7.4.1"
VERSION = "7.5.0"
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync
from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync, _get_backoff_time
from .._common._configuration import Configuration
from .._common.utils import create_properties, strip_protocol_from_uri, parse_sas_credential
from .._common.constants import (
@@ -268,7 +268,12 @@ async def _backoff(
self, retried_times, last_exception, abs_timeout_time=None, entity_name=None
):
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
):
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
.. admonition:: Example:
@@ -129,6 +131,8 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClie
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:rtype: ~azure.servicebus.aio.ServiceBusClient
.. admonition:: Example:
@@ -209,6 +213,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -298,6 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -338,6 +344,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -428,6 +435,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -448,6 +456,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@


import logging
import time
import pytest

from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@@ -401,3 +402,44 @@ async def test_client_named_key_credential_async(self,
async with client:
async with client.get_queue_sender(servicebus_queue.name) as sender:
await sender.send_messages(ServiceBusMessage("foo"))

async def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = await client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = await client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = await client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = await client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)
43 changes: 42 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_sb_client.py
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
from azure.common import AzureHttpError, AzureConflictHttpError
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
from azure.mgmt.servicebus.models import AccessRights
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver, RetryMode
from azure.servicebus._base_handler import ServiceBusSharedKeyCredential
from azure.servicebus._common.message import ServiceBusMessage, ServiceBusReceivedMessage
from azure.servicebus.exceptions import (
@@ -419,3 +419,44 @@ def test_client_azure_named_key_credential(self,
with client:
with client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(ServiceBusMessage("foo"))

def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

0 comments on commit f68dd95

Please sign in to comment.