Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] fixed retry backoff #22317

Merged
merged 3 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 5.6.2 (Unreleased)
## 5.7.0 (Unreleased)

### Features Added

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "5.6.2"
VERSION = "5.7.0"
6 changes: 5 additions & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Release History

## 7.4.1 (Unreleased)
## 7.5.0 (Unreleased)

### Features Added

- Added support for fixed (linear) retry backoff:
- Sync/async `ServiceBusClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
- `RetryMode` enum has been added to `azure.servicebus`, with values `FIXED` and `EXPONENTIAL`.

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
parse_connection_string,
ServiceBusConnectionStringProperties,
)
from ._retry import RetryMode

TransportType = constants.TransportType

Expand All @@ -46,4 +47,5 @@
"AutoLockRenewer",
"parse_connection_string",
"ServiceBusConnectionStringProperties",
"RetryMode",
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from ._common._configuration import Configuration
from ._retry import RetryMode
from .exceptions import (
ServiceBusError,
ServiceBusConnectionError,
Expand Down Expand Up @@ -152,6 +153,12 @@ def _generate_sas_token(uri, policy, key, expiry=None):
token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
if retry_mode == RetryMode.FIXED:
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2 ** retried_times)
return min(backoff_max, backoff_value)

class ServiceBusSASTokenCredential(object):
"""The shared access token credential used for authentication.
Expand Down Expand Up @@ -418,7 +425,12 @@ def _backoff(
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from typing import Optional, Dict, Any

from uamqp.constants import TransportType
from .._retry import RetryMode


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.retry_mode = kwargs.get("retry_mode", RetryMode.EXPONENTIAL)
self.retry_backoff_factor = kwargs.get(
"retry_backoff_factor", 0.8
) # type: float
Expand Down
9 changes: 9 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from enum import Enum

class RetryMode(str, Enum):
EXPONENTIAL = 'exponential'
FIXED = 'fixed'
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode

.. admonition:: Example:

Expand Down Expand Up @@ -152,6 +154,8 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode
:rtype: ~azure.servicebus.ServiceBusClient

.. admonition:: Example:
Expand Down Expand Up @@ -212,6 +216,7 @@ def get_queue_sender(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -303,6 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -344,6 +350,7 @@ def get_topic_sender(self, topic_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -433,6 +440,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand All @@ -453,6 +461,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "7.4.1"
VERSION = "7.5.0"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync
from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync, _get_backoff_time
from .._common._configuration import Configuration
from .._common.utils import create_properties, strip_protocol_from_uri, parse_sas_credential
from .._common.constants import (
Expand Down Expand Up @@ -268,7 +268,12 @@ async def _backoff(
self, retried_times, last_exception, abs_timeout_time=None, entity_name=None
):
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode

.. admonition:: Example:

Expand Down Expand Up @@ -129,6 +131,8 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClie
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:rtype: ~azure.servicebus.aio.ServiceBusClient

.. admonition:: Example:
Expand Down Expand Up @@ -209,6 +213,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -298,6 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -338,6 +344,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down Expand Up @@ -428,6 +435,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand All @@ -448,6 +456,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


import logging
import time
import pytest

from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
Expand Down Expand Up @@ -401,3 +402,44 @@ async def test_client_named_key_credential_async(self,
async with client:
async with client.get_queue_sender(servicebus_queue.name) as sender:
await sender.send_messages(ServiceBusMessage("foo"))

async def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = await client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = await client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = await client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = await client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)
43 changes: 42 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_sb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from azure.common import AzureHttpError, AzureConflictHttpError
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
from azure.mgmt.servicebus.models import AccessRights
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver, RetryMode
from azure.servicebus._base_handler import ServiceBusSharedKeyCredential
from azure.servicebus._common.message import ServiceBusMessage, ServiceBusReceivedMessage
from azure.servicebus.exceptions import (
Expand Down Expand Up @@ -419,3 +419,44 @@ def test_client_azure_named_key_credential(self,
with client:
with client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(ServiceBusMessage("foo"))

def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)