Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] fixed retry backoff #22317

Merged
merged 3 commits into from
Jan 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
inital fixed retry commit
swathipil committed Jan 4, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 1f358d07d64bf7061f87daf919cc05a3e0bfa6ac
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
parse_connection_string,
ServiceBusConnectionStringProperties,
)
from ._retry import RetryMode

TransportType = constants.TransportType

@@ -46,4 +47,5 @@
"AutoLockRenewer",
"parse_connection_string",
"ServiceBusConnectionStringProperties",
"RetryMode",
]
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from ._common._configuration import Configuration
from ._retry import RetryMode
from .exceptions import (
ServiceBusError,
ServiceBusConnectionError,
@@ -152,6 +153,12 @@ def _generate_sas_token(uri, policy, key, expiry=None):
token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
if retry_mode == RetryMode.FIXED:
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2 ** retried_times)
return min(backoff_max, backoff_value)

class ServiceBusSASTokenCredential(object):
"""The shared access token credential used for authentication.
@@ -418,7 +425,12 @@ def _backoff(
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
Original file line number Diff line number Diff line change
@@ -5,12 +5,14 @@
from typing import Optional, Dict, Any

from uamqp.constants import TransportType
from .._retry import RetryMode


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.retry_mode = kwargs.get("retry_mode", RetryMode.EXPONENTIAL)
self.retry_backoff_factor = kwargs.get(
"retry_backoff_factor", 0.8
) # type: float
11 changes: 11 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any

from enum import Enum

class RetryMode(str, Enum):
EXPONENTIAL = 'exponential'
FIXED = 'fixed'
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode

.. admonition:: Example:

@@ -152,6 +154,8 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.servicebus.RetryMode
:rtype: ~azure.servicebus.ServiceBusClient

.. admonition:: Example:
@@ -212,6 +216,7 @@ def get_queue_sender(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -303,6 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -344,6 +350,7 @@ def get_topic_sender(self, topic_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -433,6 +440,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -453,6 +461,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync
from .._base_handler import _generate_sas_token, BaseHandler as BaseHandlerSync, _get_backoff_time
from .._common._configuration import Configuration
from .._common.utils import create_properties, strip_protocol_from_uri, parse_sas_credential
from .._common.constants import (
@@ -268,7 +268,12 @@ async def _backoff(
self, retried_times, last_exception, abs_timeout_time=None, entity_name=None
):
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.retry_backoff_factor,
self._config.retry_backoff_max,
retried_times,
)
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
):
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@ class ServiceBusClient(object):
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode

.. admonition:: Example:

@@ -129,6 +131,8 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClie
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:rtype: ~azure.servicebus.aio.ServiceBusClient

.. admonition:: Example:
@@ -209,6 +213,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -298,6 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -338,6 +344,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -428,6 +435,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
@@ -448,6 +456,7 @@ def get_subscription_receiver(
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
retry_mode=self._config.retry_mode,
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@


import logging
import time
import pytest

from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@@ -401,3 +402,44 @@ async def test_client_named_key_credential_async(self,
async with client:
async with client.get_queue_sender(servicebus_queue.name) as sender:
await sender.send_messages(ServiceBusMessage("foo"))

async def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = await client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = await client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = await client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = await client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)
43 changes: 42 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_sb_client.py
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
from azure.common import AzureHttpError, AzureConflictHttpError
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
from azure.mgmt.servicebus.models import AccessRights
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver
from azure.servicebus import ServiceBusClient, ServiceBusSender, ServiceBusReceiver, RetryMode
from azure.servicebus._base_handler import ServiceBusSharedKeyCredential
from azure.servicebus._common.message import ServiceBusMessage, ServiceBusReceivedMessage
from azure.servicebus.exceptions import (
@@ -419,3 +419,44 @@ def test_client_azure_named_key_credential(self,
with client:
with client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(ServiceBusMessage("foo"))

def test_backoff_fixed_retry(self):
client = ServiceBusClient(
'fake.host.com',
'fake_eh',
retry_mode=RetryMode.FIXED
)
# queue sender
sender = client.get_queue_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.EXPONENTIAL
# check that fixed is less than 'exp'
assert sleep_time_fixed < backoff * (2 ** 1)

# topic sender
sender = client.get_topic_sender('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
sender._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# queue receiver
receiver = client.get_queue_receiver('fake_name')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)

# subscription receiver
receiver = client.get_subscription_receiver('fake_topic', 'fake_sub')
backoff = client._config.retry_backoff_factor
start_time = time.time()
receiver._backoff(retried_times=1, last_exception=Exception('fake'), abs_timeout_time=None)
sleep_time_fixed = time.time() - start_time
assert sleep_time_fixed < backoff * (2 ** 1)