Skip to content

Commit

Permalink
[Cosmos] Resiliency and Documentation Improvements (#36514)
Browse files Browse the repository at this point in the history
* 403.3 loop fix, regional routing fix, improvement on service request errors

functional code, missing tests now

* Update ErrorCodesAndRetries.md

* Update TimeoutAndRetriesConfig.md

* Update http_constants.py

* Update CHANGELOG.md

* test improvements for 403 retry

* fix emulator tests

* Update test_globaldb.py

* Update test_globaldb.py

* add ServiceRequestError test and doc update

* addressing comments

* Update test_globaldb.py

* Update test_globaldb.py

* Update test_globaldb.py

* Update test_globaldb.py

* move policy

* revert

* fixes

* Update test_globaldb.py

* Update test_globaldb.py

* Update test_globaldb.py

* Update test_globaldb.py

* Update test_globaldb.py

* Update CHANGELOG.md

* 503 retries

* align readme with changelog

* forceful db account refresh

* remove premature locational endpoint

* make GEM refresh every 5 mins as it should have

* Delete drz3-drill.txt

* Update CHANGELOG.md

* Update test_location_cache.py

* Update _global_endpoint_manager.py

* ensure only one initial database account call

* Delete dr-zdrill-005.txt

* Update test_location_cache.py

* Update test_location_cache.py

* Update test_location_cache.py

* overhaul location_cache tests
  • Loading branch information
simorenoh authored Oct 7, 2024
1 parent 886139f commit f8ab118
Show file tree
Hide file tree
Showing 18 changed files with 391 additions and 534 deletions.
9 changes: 5 additions & 4 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
### 4.7.1 (Unreleased)

#### Features Added
* SDK will now retry all ServiceRequestErrors (failing outgoing requests) before failing. Default number of retries is 3. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Added Retry Policy for Container Recreate in the Python SDK. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
* Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365)
* Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
* Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)

#### Breaking Changes

#### Bugs Fixed
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731)
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731).
* Fixed bug with client hangs when running into WriteForbidden exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Added retry handling logic for DatabaseAccountNotFound exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Fixed SDK regex validation that would not allow for item ids to be longer than 255 characters. See [PR 36569](https://github.com/Azure/azure-sdk-for-python/pull/36569).
* Fixed issue where 'NoneType' object has no attribute error was raised when a session retry happened during a query. See [PR 37578](https://github.com/Azure/azure-sdk-for-python/pull/37578).

#### Other Changes
* Getting offer thoughput when it has not been defined in a container will now give a 404/10004 instead of just a 404. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
* Incomplete Partition Key Extractions in documents for Subpartitioning now gives 400/1001 instead of just a 400. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)

* SDK will now make database account calls every 5 minutes to refresh location cache. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).

### 4.7.0 (2024-05-15)

Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ New releases of this SDK won't support Python 2.x starting January 1st, 2022. Pl

* Azure subscription - [Create a free account][azure_sub]
* Azure [Cosmos DB account][cosmos_account] - SQL API
* [Python 3.6+][python]
* [Python 3.8+][python]

If you need a Cosmos DB SQL API account, you can create one with this [Azure CLI][azure_cli] command:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,4 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
# is set to false
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)

# Resolve the endpoint for the request and pin the resolution to the resolved endpoint
# This enables marking the endpoint unavailability on endpoint failover/unreachability
self.location_endpoint = self.global_endpoint_manager.resolve_service_endpoint(self.request)
self.request.route_to_location(self.location_endpoint)
return True
33 changes: 17 additions & 16 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from . import exceptions
from ._location_cache import LocationCache


# pylint: disable=protected-access


Expand Down Expand Up @@ -89,29 +90,29 @@ def force_refresh(self, database_account):
self.refresh_endpoint_list(database_account)

def refresh_endpoint_list(self, database_account, **kwargs):
with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
if not self.refresh_needed:
return
try:
self._refresh_endpoint_list_private(database_account, **kwargs)
except Exception as e:
raise e
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
self.refresh_needed = True
if self.refresh_needed:
with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
if not self.refresh_needed:
return
try:
self._refresh_endpoint_list_private(database_account, **kwargs)
except Exception as e:
raise e

def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
if database_account:
self.location_cache.perform_on_database_account_read(database_account)
self.refresh_needed = False

if (
self.location_cache.should_refresh_endpoints()
and self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
):
if not database_account:
self.last_refresh_time = self.location_cache.current_time_millis()
else:
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
database_account = self._GetDatabaseAccount(**kwargs)
self.location_cache.perform_on_database_account_read(database_account)
self.last_refresh_time = self.location_cache.current_time_millis()
self.refresh_needed = False

def _GetDatabaseAccount(self, **kwargs):
"""Gets the database account.
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,17 @@ def clear_stale_endpoint_unavailability_info(self):

self.location_unavailability_info_by_endpoint = new_location_unavailability_info

def is_endpoint_unavailable(self, endpoint, expected_available_operations):
def is_endpoint_unavailable(self, endpoint: str, expected_available_operation: str):
unavailability_info = (
self.location_unavailability_info_by_endpoint[endpoint]
if endpoint in self.location_unavailability_info_by_endpoint
else None
)

if (
expected_available_operations == EndpointOperationType.NoneType
expected_available_operation == EndpointOperationType.NoneType
or not unavailability_info
or expected_available_operations not in unavailability_info["operationType"]
or expected_available_operation not in unavailability_info["operationType"]
):
return False

Expand Down
16 changes: 13 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import time
from typing import Optional

from azure.core.exceptions import AzureError, ClientAuthenticationError
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
from azure.core.pipeline import PipelineRequest
from azure.core.pipeline.policies import RetryPolicy
from azure.core.pipeline.transport._base import HttpRequest
Expand Down Expand Up @@ -124,7 +124,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
except exceptions.CosmosHttpResponseError as e:
retry_policy = defaultRetry_policy
# Re-assign retry policy based on error code
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in\
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
retry_policy = endpointDiscovery_retry_policy
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
retry_policy = resourceThrottle_retry_policy
Expand Down Expand Up @@ -161,7 +162,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):

retry_policy.container_rid = cached_container["_rid"]
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
retry_policy = timeout_failover_retry_policy

# If none of the retry policies applies or there is no retry needed, set the
Expand Down Expand Up @@ -259,6 +260,15 @@ def send(self, request):
timeout_error.response = response
timeout_error.history = retry_settings['history']
raise
except ServiceRequestError as err:
# the request ran into a socket timeout or failed to establish a new connection
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
if retry_settings['connect'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
self.sleep(retry_settings, request.context.transport)
continue
raise err
except AzureError as err:
retry_error = err
if self._is_method_retryable(retry_settings, request.http_request):
Expand Down
40 changes: 25 additions & 15 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def _request_body_from_data(data):
if data is None or isinstance(data, str) or _is_readable_stream(data):
return data
if isinstance(data, (dict, list, tuple)):

json_dumped = json.dumps(data, separators=(",", ":"))

return json_dumped
Expand All @@ -70,9 +69,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
"""Makes one http request using the requests module.
:param _GlobalEndpointManager global_endpoint_manager:
:param dict request_params:
contains the resourceType, operationType, endpointOverride,
useWriteEndpoint, useAlternateWriteEndpoint information
:param ~azure.cosmos._request_object.RequestObject request_params:
contains information for the request, like the resource_type, operation_type, and endpoint_override
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client:
Pipeline client to process the request
Expand All @@ -90,7 +88,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
# Every request tries to perform a refresh
client_timeout = kwargs.get('timeout')
start_time = time.time()
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if client_timeout is not None:
kwargs['timeout'] = client_timeout - (time.time() - start_time)
if kwargs['timeout'] <= 0:
Expand All @@ -100,8 +99,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
base_url = request_params.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
if base_url != pipeline_client._base_url:
request.url = request.url.replace(pipeline_client._base_url, base_url)
if not request.url.startswith(base_url):
request.url = _replace_url_prefix(request.url, base_url)

parse_result = urlparse(request.url)

Expand Down Expand Up @@ -167,20 +166,31 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
return result, headers


def _replace_url_prefix(original_url, new_prefix):
parts = original_url.split('/', 3)

if not new_prefix.endswith('/'):
new_prefix += '/'

new_url = new_prefix + parts[3] if len(parts) > 3 else new_prefix

return new_url


def _PipelineRunFunction(pipeline_client, request, **kwargs):
# pylint: disable=protected-access

return pipeline_client._pipeline.run(request, **kwargs)

def SynchronizedRequest(
client,
request_params,
global_endpoint_manager,
connection_policy,
pipeline_client,
request,
request_data,
**kwargs
client,
request_params,
global_endpoint_manager,
connection_policy,
pipeline_client,
request,
request_data,
**kwargs
):
"""Performs one synchronized http request according to the parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
from .. import exceptions
from .. import http_constants
from . import _retry_utility_async
from .._synchronized_request import _request_body_from_data
from .._synchronized_request import _request_body_from_data, _replace_url_prefix


async def _Request(global_endpoint_manager, request_params, connection_policy, pipeline_client, request, **kwargs):
"""Makes one http request using the requests module.
:param _GlobalEndpointManager global_endpoint_manager:
:param dict request_params:
contains the resourceType, operationType, endpointOverride,
useWriteEndpoint, useAlternateWriteEndpoint information
:param ~azure.cosmos._request_object.RequestObject request_params:
contains information for the request, like the resource_type, operation_type, and endpoint_override
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client:
Pipeline client to process the request
Expand All @@ -58,7 +57,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
# Every request tries to perform a refresh
client_timeout = kwargs.get('timeout')
start_time = time.time()
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if client_timeout is not None:
kwargs['timeout'] = client_timeout - (time.time() - start_time)
if kwargs['timeout'] <= 0:
Expand All @@ -68,8 +68,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
base_url = request_params.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
if base_url != pipeline_client._base_url:
request.url = request.url.replace(pipeline_client._base_url, base_url)
if not request.url.startswith(base_url):
request.url = _replace_url_prefix(request.url, base_url)

parse_result = urlparse(request.url)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .. import exceptions
from .._location_cache import LocationCache


# pylint: disable=protected-access

class _GlobalEndpointManager(object):
Expand Down Expand Up @@ -83,6 +84,8 @@ async def force_refresh(self, database_account):
await self.refresh_endpoint_list(database_account)

async def refresh_endpoint_list(self, database_account, **kwargs):
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
self.refresh_needed = True
if self.refresh_needed:
async with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
Expand All @@ -94,18 +97,16 @@ async def refresh_endpoint_list(self, database_account, **kwargs):
raise e

async def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
self.refresh_needed = False
if database_account:
self.location_cache.perform_on_database_account_read(database_account)
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
else:
if (
self.location_cache.should_refresh_endpoints()
and
self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
):
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
database_account = await self._GetDatabaseAccount(**kwargs)
self.location_cache.perform_on_database_account_read(database_account)
self.last_refresh_time = self.location_cache.current_time_millis()

async def _GetDatabaseAccount(self, **kwargs):
"""Gets the database account.
Expand Down
16 changes: 13 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import asyncio
from typing import Optional

from azure.core.exceptions import AzureError, ClientAuthenticationError
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
from azure.core.pipeline.policies import AsyncRetryPolicy
from azure.core.pipeline.transport._base import HttpRequest

Expand Down Expand Up @@ -123,7 +123,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
return result
except exceptions.CosmosHttpResponseError as e:
retry_policy = None
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in \
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
retry_policy = endpointDiscovery_retry_policy
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
retry_policy = resourceThrottle_retry_policy
Expand Down Expand Up @@ -160,7 +161,7 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg

retry_policy.container_rid = cached_container["_rid"]
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
retry_policy = timeout_failover_retry_policy
else:
retry_policy = defaultRetry_policy
Expand Down Expand Up @@ -245,6 +246,15 @@ async def send(self, request):
timeout_error.response = response
timeout_error.history = retry_settings['history']
raise
except ServiceRequestError as err:
# the request ran into a socket timeout or failed to establish a new connection
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
if retry_settings['connect'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
await self.sleep(retry_settings, request.context.transport)
continue
raise err
except AzureError as err:
retry_error = err
if self._is_method_retryable(retry_settings, request.http_request):
Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def _build_connection_policy(kwargs: Dict[str, Any]) -> ConnectionPolicy:
"'connection_retry_policy' has been deprecated and will be removed from the SDK in a future release.",
DeprecationWarning
)
connection_retry = policy.ConnectionRetryConfiguration
if not connection_retry:
connection_retry = ConnectionRetryPolicy(
retry_total=total_retries,
Expand Down
Loading

0 comments on commit f8ab118

Please sign in to comment.