diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py index 95757edb63dd..d18d63a436f1 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py @@ -246,7 +246,7 @@ def send(self, request, **kwargs): # type: ignore allow_redirects=False, **kwargs) - except urllib3.exceptions.NewConnectionError as err: + except (urllib3.exceptions.NewConnectionError, urllib3.exceptions.ConnectTimeoutError) as err: error = ServiceRequestError(err, error=err) except requests.exceptions.ReadTimeout as err: error = ServiceResponseError(err, error=err) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py index 93920f280f41..91182d089e7b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py @@ -19,6 +19,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from ._retry_utility import ConnectionRetryPolicy from .container import ContainerProxy from .cosmos_client import CosmosClient from .database import DatabaseProxy @@ -56,5 +57,6 @@ "SSLConfiguration", "TriggerOperation", "TriggerType", + "ConnectionRetryPolicy", ) __version__ = VERSION diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index cb6cb7e7b0d3..de0eeb4ed394 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -26,12 +26,11 @@ """ from typing import Dict, Any, Optional import six -import requests -from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error from azure.core.paging import ItemPaged # type: ignore from azure.core import PipelineClient # type: ignore -from azure.core.pipeline.transport import RequestsTransport from azure.core.pipeline.policies import ( # type: ignore + HTTPPolicy, ContentDecodePolicy, HeadersPolicy, UserAgentPolicy, @@ -51,6 +50,7 @@ from . import _synchronized_request as synchronized_request from . import _global_endpoint_manager as global_endpoint_manager from ._routing import routing_map_provider +from ._retry_utility import ConnectionRetryPolicy from . import _session from . import _utils from .partition_key import _Undefined, _Empty @@ -151,15 +151,24 @@ def __init__( self._useMultipleWriteLocations = False self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self) - # creating a requests session used for connection pooling and re-used by all requests - requests_session = requests.Session() - - transport = None - if self.connection_policy.ConnectionRetryConfiguration is not None: - adapter = HTTPAdapter(max_retries=self.connection_policy.ConnectionRetryConfiguration) - requests_session.mount('http://', adapter) - requests_session.mount('https://', adapter) - transport = RequestsTransport(session=requests_session) + retry_policy = None + if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy): + retry_policy = self.connection_policy.ConnectionRetryConfiguration + elif isinstance(self.connection_policy.ConnectionRetryConfiguration, int): + retry_policy = ConnectionRetryPolicy(total=self.connection_policy.ConnectionRetryConfiguration) + elif isinstance(self.connection_policy.ConnectionRetryConfiguration, Retry): + # Convert a urllib3 retry policy to a Pipeline policy + retry_policy = ConnectionRetryPolicy( + retry_total=self.connection_policy.ConnectionRetryConfiguration.total, + retry_connect=self.connection_policy.ConnectionRetryConfiguration.connect, + retry_read=self.connection_policy.ConnectionRetryConfiguration.read, + retry_status=self.connection_policy.ConnectionRetryConfiguration.status, + retry_backoff_max=self.connection_policy.ConnectionRetryConfiguration.BACKOFF_MAX, + retry_on_status_codes=list(self.connection_policy.ConnectionRetryConfiguration.status_forcelist), + retry_backoff_factor=self.connection_policy.ConnectionRetryConfiguration.backoff_factor + ) + else: + TypeError("Unsupported retry policy. Must be an azure.cosmos.ConnectionRetryPolicy, int, or urllib3.Retry") proxies = kwargs.pop('proxies', {}) if self.connection_policy.ProxyConfiguration and self.connection_policy.ProxyConfiguration.Host: @@ -173,11 +182,13 @@ def __init__( ProxyPolicy(proxies=proxies), UserAgentPolicy(base_user_agent=_utils.get_user_agent(), **kwargs), ContentDecodePolicy(), + retry_policy, CustomHookPolicy(**kwargs), DistributedTracingPolicy(), NetworkTraceLoggingPolicy(**kwargs), ] + transport = kwargs.pop("transport", None) self.pipeline_client = PipelineClient(url_connection, "empty-config", transport=transport, policies=policies) # Query compatibility mode. @@ -188,7 +199,7 @@ def __init__( # Routing map provider self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self) - database_account = self._global_endpoint_manager._GetDatabaseAccount() + database_account = self._global_endpoint_manager._GetDatabaseAccount(**kwargs) self._global_endpoint_manager.force_refresh(database_account) @property diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py index d4dc37ee7533..acfab1059022 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py @@ -87,17 +87,17 @@ def force_refresh(self, database_account): self.refresh_needed = True self.refresh_endpoint_list(database_account) - def refresh_endpoint_list(self, database_account): + def refresh_endpoint_list(self, database_account, **kwargs): with self.refresh_lock: # if refresh is not needed or refresh is already taking place, return if not self.refresh_needed: return try: - self._refresh_endpoint_list_private(database_account) + self._refresh_endpoint_list_private(database_account, **kwargs) except Exception as e: raise e - def _refresh_endpoint_list_private(self, database_account=None): + def _refresh_endpoint_list_private(self, database_account=None, **kwargs): if database_account: self.location_cache.perform_on_database_account_read(database_account) self.refresh_needed = False @@ -107,18 +107,18 @@ def _refresh_endpoint_list_private(self, database_account=None): and self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms ): if not database_account: - database_account = self._GetDatabaseAccount() + database_account = self._GetDatabaseAccount(**kwargs) self.location_cache.perform_on_database_account_read(database_account) self.last_refresh_time = self.location_cache.current_time_millis() self.refresh_needed = False - def _GetDatabaseAccount(self): + def _GetDatabaseAccount(self, **kwargs): """Gets the database account first by using the default endpoint, and if that doesn't returns use the endpoints for the preferred locations in the order they are specified to get the database account. """ try: - database_account = self._GetDatabaseAccountStub(self.DefaultEndpoint) + database_account = self._GetDatabaseAccountStub(self.DefaultEndpoint, **kwargs) return database_account # If for any reason(non-globaldb related), we are not able to get the database # account from the above call to GetDatabaseAccount, we would try to get this @@ -130,18 +130,18 @@ def _GetDatabaseAccount(self): for location_name in self.PreferredLocations: locational_endpoint = _GlobalEndpointManager.GetLocationalEndpoint(self.DefaultEndpoint, location_name) try: - database_account = self._GetDatabaseAccountStub(locational_endpoint) + database_account = self._GetDatabaseAccountStub(locational_endpoint, **kwargs) return database_account except errors.CosmosHttpResponseError: pass return None - def _GetDatabaseAccountStub(self, endpoint): + def _GetDatabaseAccountStub(self, endpoint, **kwargs): """Stub for getting database account from the client which can be used for mocking purposes as well. """ - return self.Client.GetDatabaseAccount(endpoint) + return self.Client.GetDatabaseAccount(endpoint, **kwargs) @staticmethod def GetLocationalEndpoint(default_endpoint, location_name): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index df575cb27d36..20362d09aa32 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -24,6 +24,9 @@ import time +from azure.core.exceptions import AzureError, ClientAuthenticationError +from azure.core.pipeline.policies import RetryPolicy + from . import errors from . import _endpoint_discovery_retry_policy from . import _resource_throttle_retry_policy @@ -64,6 +67,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): ) while True: try: + client_timeout = kwargs.get('timeout') + start_time = time.time() if args: result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs) else: @@ -113,9 +118,92 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # Wait for retry_after_in_milliseconds time before the next retry time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0) + if client_timeout: + kwargs['timeout'] = client_timeout - (time.time() - start_time) + if kwargs['timeout'] <= 0: + raise errors.CosmosClientTimeoutError() def ExecuteFunction(function, *args, **kwargs): """ Stub method so that it can be used for mocking purposes as well. """ return function(*args, **kwargs) + + +def _configure_timeout(request, absolute, per_request): + # type: (azure.core.pipeline.PipelineRequest, Optional[int], int) -> Optional[AzureError] + if absolute is not None: + if absolute <= 0: + raise errors.CosmosClientTimeoutError() + if per_request: + # Both socket timeout and client timeout have been provided - use the shortest value. + request.context.options['connection_timeout'] = min(per_request, absolute) + else: + # Only client timeout provided. + request.context.options['connection_timeout'] = absolute + elif per_request: + # Only socket timeout provided. + request.context.options['connection_timeout'] = per_request + + +class ConnectionRetryPolicy(RetryPolicy): + + def __init__(self, **kwargs): + clean_kwargs = {k: v for k, v in kwargs.items() if v is not None} + super(ConnectionRetryPolicy, self).__init__(**clean_kwargs) + + def send(self, request): + """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. + Also enforces an absolute client-side timeout that spans multiple retry attempts. + + :param request: The PipelineRequest object + :type request: ~azure.core.pipeline.PipelineRequest + :return: Returns the PipelineResponse or raises error if maximum retries exceeded. + :rtype: ~azure.core.pipeline.PipelineResponse + :raises: ~azure.core.exceptions.AzureError if maximum retries exceeded. + :raises: ~azure.cosmos.CosmosClientTimeoutError if specified timeout exceeded. + :raises: ~azure.core.exceptions.ClientAuthenticationError if authentication + """ + absolute_timeout = request.context.options.pop('timeout', None) + per_request_timeout = request.context.options.pop('connection_timeout', 0) + + retry_error = None + retry_active = True + response = None + retry_settings = self.configure_retries(request.context.options) + while retry_active: + try: + start_time = time.time() + _configure_timeout(request, absolute_timeout, per_request_timeout) + + response = self.next.send(request) + if self.is_retry(retry_settings, response): + retry_active = self.increment(retry_settings, response=response) + if retry_active: + self.sleep(retry_settings, request.context.transport, response=response) + continue + break + except ClientAuthenticationError: # pylint:disable=try-except-raise + # the authentication policy failed such that the client's request can't + # succeed--we'll never have a response to it, so propagate the exception + raise + except errors.CosmosClientTimeoutError as timeout_error: + timeout_error.inner_exception = retry_error + timeout_error.response = response + timeout_error.history = retry_settings['history'] + raise + except AzureError as err: + retry_error = err + if self._is_method_retryable(retry_settings, request.http_request): + retry_active = self.increment(retry_settings, response=request, error=err) + if retry_active: + self.sleep(retry_settings, request.context.transport) + continue + raise err + finally: + end_time = time.time() + if absolute_timeout: + absolute_timeout -= (end_time - start_time) + + self.update_context(response.context, retry_settings) + return response diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 7f1b900303ca..f697f72f66c8 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -23,6 +23,7 @@ """ import json +import time from six.moves.urllib.parse import urlparse import six @@ -96,7 +97,13 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin connection_timeout = kwargs.pop("connection_timeout", connection_timeout / 1000.0) # Every request tries to perform a refresh - global_endpoint_manager.refresh_endpoint_list(None) + client_timeout = kwargs.get('timeout') + start_time = time.time() + global_endpoint_manager.refresh_endpoint_list(None, **kwargs) + if client_timeout is not None: + kwargs['timeout'] = client_timeout - (time.time() - start_time) + if kwargs['timeout'] <= 0: + raise errors.CosmosClientTimeoutError() if request_params.endpoint_override: base_url = request_params.endpoint_override @@ -149,7 +156,7 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin return (response.stream_download(pipeline_client._pipeline), headers) data = response.body() - if not six.PY2: + if data and not six.PY2: # python 3 compatible: convert data from byte to unicode string data = data.decode("utf-8") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 1d2cdcc55fc8..28536884dd4b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -22,13 +22,14 @@ """Create, read, and delete databases in the Azure Cosmos DB SQL API service. """ -from typing import Any, Dict, Mapping, Optional, Union, cast, Iterable, List +from typing import Any, Dict, Mapping, Optional, Union, cast, Iterable, List # pylint: disable=unused-import import six from azure.core.tracing.decorator import distributed_trace # type: ignore from ._cosmos_client_connection import CosmosClientConnection from ._base import build_options +from ._retry_utility import ConnectionRetryPolicy from .database import DatabaseProxy from .documents import ConnectionPolicy, DatabaseAccount from .errors import CosmosResourceNotFoundError @@ -96,11 +97,25 @@ def _build_connection_policy(kwargs): # Retry config retry = kwargs.pop('retry_options', None) or policy.RetryOptions - retry._max_retry_attempt_count = kwargs.pop('retry_total', None) or retry._max_retry_attempt_count + total_retries = kwargs.pop('retry_total', None) + retry._max_retry_attempt_count = total_retries or retry._max_retry_attempt_count retry._fixed_retry_interval_in_milliseconds = kwargs.pop('retry_fixed_interval', None) or \ retry._fixed_retry_interval_in_milliseconds - retry._max_wait_time_in_seconds = kwargs.pop('retry_backoff_max', None) or retry._max_wait_time_in_seconds + max_backoff = kwargs.pop('retry_backoff_max', None) + retry._max_wait_time_in_seconds = max_backoff or retry._max_wait_time_in_seconds policy.RetryOptions = retry + connection_retry = kwargs.pop('connection_retry_policy', None) or policy.ConnectionRetryConfiguration + if not connection_retry: + connection_retry = ConnectionRetryPolicy( + retry_total=total_retries, + retry_connect=kwargs.pop('retry_connect', None), + retry_read=kwargs.pop('retry_read', None), + retry_status=kwargs.pop('retry_status', None), + retry_backoff_max=max_backoff, + retry_on_status_codes=kwargs.pop('retry_on_status_codes', []), + retry_backoff_factor=kwargs.pop('retry_backoff_factor', 0.8), + ) + policy.ConnectionRetryConfiguration = connection_retry return policy @@ -130,6 +145,11 @@ class CosmosClient(object): *retry_total* - Maximum retry attempts. *retry_backoff_max* - Maximum retry wait time in seconds. *retry_fixed_interval* - Fixed retry interval in milliseconds. + *retry_read* - Maximum number of socket read retry attempts. + *retry_connect* - Maximum number of connection error retry attempts. + *retry_status* - Maximum number of retry attempts on error status codes. + *retry_on_status_codes* - A list of specific status codes to retry on. + *retry_backoff_factor* - Factor to calculate wait time between retry attempts. *enable_endpoint_discovery* - Enable endpoint discovery for geo-replicated database accounts. Default is True. *preferred_locations* - The preferred locations for geo-replicated database accounts. When `enable_endpoint_discovery` is true and `preferred_locations` is non-empty, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py index 02b80331b281..cfccc00f2ef4 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py @@ -372,8 +372,10 @@ class ConnectionPolicy(object): # pylint: disable=too-many-instance-attributes :ivar boolean UseMultipleWriteLocations: Flag to enable writes on any locations (regions) for geo-replicated database accounts in the azure Cosmos service. - :ivar (int or requests.packages.urllib3.util.retry) ConnectionRetryConfiguration: - Retry Configuration to be used for urllib3 connection retries. + :ivar ConnectionRetryConfiguration: + Retry Configuration to be used for connection retries. + :vartype ConnectionRetryConfiguration: + int or azure.cosmos.ConnectionRetryPolicy or requests.packages.urllib3.util.retry """ __defaultRequestTimeout = 60000 # milliseconds diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/errors.py b/sdk/cosmos/azure-cosmos/azure/cosmos/errors.py index 5fcb514959c1..698924ef3013 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/errors.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/errors.py @@ -63,3 +63,13 @@ class CosmosResourceExistsError(ResourceExistsError, CosmosHttpResponseError): class CosmosAccessConditionFailedError(CosmosHttpResponseError): """An error response with status code 412.""" + + +class CosmosClientTimeoutError(AzureError): + """An operation failed to complete within the specified timeout.""" + + def __init__(self, **kwargs): + message = "Client operation failed to complete within specified timeout." + self.response = None + self.history = None + super(CosmosClientTimeoutError, self).__init__(message, **kwargs) diff --git a/sdk/cosmos/azure-cosmos/dev_requirements.txt b/sdk/cosmos/azure-cosmos/dev_requirements.txt index 6ccb7f031ddd..f3473b3bd4d0 100644 --- a/sdk/cosmos/azure-cosmos/dev_requirements.txt +++ b/sdk/cosmos/azure-cosmos/dev_requirements.txt @@ -1 +1,2 @@ -e ../../../tools/azure-sdk-tools +-e ../../core/azure-core diff --git a/sdk/cosmos/azure-cosmos/test/crud_tests.py b/sdk/cosmos/azure-cosmos/test/crud_tests.py index 1d0212e9cf8a..85aa501cd50b 100644 --- a/sdk/cosmos/azure-cosmos/test/crud_tests.py +++ b/sdk/cosmos/azure-cosmos/test/crud_tests.py @@ -41,6 +41,8 @@ import urllib.parse as urllib import uuid import pytest +from azure.core.exceptions import AzureError, ServiceResponseError +from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse from azure.cosmos import _consistent_hash_ring import azure.cosmos.documents as documents import azure.cosmos.errors as errors @@ -53,6 +55,7 @@ from azure.cosmos.partition_key import PartitionKey import conftest from azure.cosmos import _retry_utility +import requests from requests.packages.urllib3.util.retry import Retry from requests.exceptions import ConnectionError @@ -66,6 +69,26 @@ # To Run the test, replace the two member fields (masterKey and host) with values # associated with your Azure Cosmos account. + +class TimeoutTransport(RequestsTransport): + + def __init__(self, response): + self._response = response + super(TimeoutTransport, self).__init__() + + def send(self, *args, **kwargs): + if kwargs.pop("passthrough", False): + return super(TimeoutTransport, self).send(*args, **kwargs) + + time.sleep(5) + if isinstance(self._response, Exception): + raise self._response + output = requests.Response() + output.status_code = self._response + response = RequestsTransportResponse(None, output) + return response + + @pytest.mark.usefixtures("teardown") class CRUDTests(unittest.TestCase): """Python CRUD Tests. @@ -1977,7 +2000,7 @@ def __get_first(array): def test_client_request_timeout(self): connection_policy = documents.ConnectionPolicy() # making timeout 0 ms to make sure it will throw - connection_policy.RequestTimeout = 0 + connection_policy.RequestTimeout = 0.000000000001 with self.assertRaises(Exception): # client does a getDatabaseAccount on initialization, which will time out cosmos_client.CosmosClient(CRUDTests.host, CRUDTests.masterKey, "Session", connection_policy=connection_policy) @@ -1985,7 +2008,7 @@ def test_client_request_timeout(self): def test_client_request_timeout_when_connection_retry_configuration_specified(self): connection_policy = documents.ConnectionPolicy() # making timeout 0 ms to make sure it will throw - connection_policy.RequestTimeout = 0 + connection_policy.RequestTimeout = 0.000000000001 connection_policy.ConnectionRetryConfiguration = Retry( total=3, read=3, @@ -1993,33 +2016,95 @@ def test_client_request_timeout_when_connection_retry_configuration_specified(se backoff_factor=0.3, status_forcelist=(500, 502, 504) ) - with self.assertRaises(Exception): + with self.assertRaises(AzureError): # client does a getDatabaseAccount on initialization, which will time out cosmos_client.CosmosClient(CRUDTests.host, CRUDTests.masterKey, "Session", connection_policy=connection_policy) def test_client_connection_retry_configuration(self): - total_time_for_two_retries = self.initialize_client_with_connection_retry_config(2) - total_time_for_three_retries = self.initialize_client_with_connection_retry_config(3) + total_time_for_two_retries = self.initialize_client_with_connection_urllib_retry_config(2) + total_time_for_three_retries = self.initialize_client_with_connection_urllib_retry_config(3) self.assertGreater(total_time_for_three_retries, total_time_for_two_retries) - def initialize_client_with_connection_retry_config(self, retries): - from azure.core.exceptions import ServiceRequestError - connection_policy = documents.ConnectionPolicy() - connection_policy.ConnectionRetryConfiguration = Retry( - total=retries, - read=retries, - connect=retries, - backoff_factor=0.3, - status_forcelist=(500, 502, 504) - ) + total_time_for_two_retries = self.initialize_client_with_connection_core_retry_config(2) + total_time_for_three_retries = self.initialize_client_with_connection_core_retry_config(3) + self.assertGreater(total_time_for_three_retries, total_time_for_two_retries) + + def initialize_client_with_connection_urllib_retry_config(self, retries): + retry_policy = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=0.3, + status_forcelist=(500, 502, 504) + ) start_time = time.time() try: - cosmos_client.CosmosClient("https://localhost:9999", CRUDTests.masterKey, "Session", connection_policy=connection_policy) + cosmos_client.CosmosClient( + "https://localhost:9999", + CRUDTests.masterKey, + "Session", + connection_retry_policy=retry_policy) self.fail() - except ServiceRequestError as e: + except AzureError as e: end_time = time.time() return end_time - start_time + def initialize_client_with_connection_core_retry_config(self, retries): + start_time = time.time() + try: + cosmos_client.CosmosClient( + "https://localhost:9999", + CRUDTests.masterKey, + "Session", + retry_total=retries, + retry_read=retries, + retry_connect=retries, + retry_status=retries) + self.fail() + except AzureError as e: + end_time = time.time() + return end_time - start_time + + def test_absolute_client_timeout(self): + with self.assertRaises(errors.CosmosClientTimeoutError): + cosmos_client.CosmosClient( + "https://localhost:9999", + CRUDTests.masterKey, + "Session", + retry_total=3, + timeout=1) + + error_response = ServiceResponseError("Read timeout") + timeout_transport = TimeoutTransport(error_response) + client = cosmos_client.CosmosClient( + self.host, self.masterKey, "Session", transport=timeout_transport, passthrough=True) + + with self.assertRaises(errors.CosmosClientTimeoutError): + client.create_database_if_not_exists("test", timeout=2) + + status_response = 500 # Users connection level retry + timeout_transport = TimeoutTransport(status_response) + client = cosmos_client.CosmosClient( + self.host, self.masterKey, "Session", transport=timeout_transport, passthrough=True) + with self.assertRaises(errors.CosmosClientTimeoutError): + client.create_database("test", timeout=2) + + databases = client.list_databases(timeout=2) + with self.assertRaises(errors.CosmosClientTimeoutError): + list(databases) + + status_response = 429 # Uses Cosmos custom retry + timeout_transport = TimeoutTransport(status_response) + client = cosmos_client.CosmosClient( + self.host, self.masterKey, "Session", transport=timeout_transport, passthrough=True) + with self.assertRaises(errors.CosmosClientTimeoutError): + client.create_database_if_not_exists("test", timeout=2) + + databases = client.list_databases(timeout=2) + with self.assertRaises(errors.CosmosClientTimeoutError): + list(databases) + + def test_query_iterable_functionality(self): def __create_resources(client): """Creates resources for this test.