Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Add max_integrated_cache_staleness param option to item methods #22946

Merged
merged 47 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
51664b6
consistency level gets set to default found in database account
simorenoh Jan 27, 2022
edb78a4
async client default change
simorenoh Jan 27, 2022
fd98dae
updated docs based on finding and updated samples to reflect best pra…
simorenoh Jan 27, 2022
df02ee4
Update CHANGELOG.md
simorenoh Jan 27, 2022
dcac0b2
Update README.md
simorenoh Jan 27, 2022
68d9a97
Update README.md
simorenoh Jan 27, 2022
8b006a6
Update README.md
simorenoh Jan 27, 2022
36cf3be
Update CHANGELOG.md
simorenoh Jan 27, 2022
6f7151e
formatting
simorenoh Jan 27, 2022
69df86d
formatting
simorenoh Jan 27, 2022
5b4f406
updated consistency for first request to Eventual (lowest latency)
simorenoh Jan 27, 2022
c19f655
pylint
simorenoh Jan 27, 2022
ce12951
from_connection_string methods
simorenoh Jan 27, 2022
84987ce
from_connection_string2
simorenoh Jan 27, 2022
2d56ebd
Update sdk/cosmos/azure-cosmos/README.md
simorenoh Jan 28, 2022
f018c89
Apply suggestions from code review
simorenoh Jan 28, 2022
2732fa1
Update README.md
simorenoh Jan 31, 2022
7116654
removed forceful header usage, changed setup to only check for Sessio…
simorenoh Feb 2, 2022
c4d688b
need to set header if Session consistency for updating session if ne…
simorenoh Feb 2, 2022
c126d33
Apply suggestions from code review
simorenoh Feb 2, 2022
00fcfd2
added test for session token
simorenoh Feb 2, 2022
9a80b00
Merge branch 'integrated-cache' of https://github.com/simorenoh/azure…
simorenoh Feb 2, 2022
7c098ea
Update CHANGELOG.md
simorenoh Feb 2, 2022
ae64f96
Update _cosmos_client_connection_async.py
simorenoh Feb 3, 2022
b121337
added max_integrated_cache_staleness to item methods in containers
simorenoh Feb 7, 2022
fb92392
Merge branch 'main' into integrated-cache
simorenoh Feb 7, 2022
01b494a
added validation and provisional comments
simorenoh Feb 7, 2022
dea6d19
pylint
simorenoh Feb 8, 2022
03ad4ab
only applied to read-only operations
simorenoh Feb 8, 2022
3378dce
Update container.py
simorenoh Feb 8, 2022
a79e9dd
Update CHANGELOG.md
simorenoh Feb 8, 2022
ed2af41
Apply suggestions from code review
simorenoh Feb 15, 2022
b48e68f
Update _base.py
simorenoh Feb 15, 2022
1d65490
Merge branch 'integrated-cache' of https://github.com/simorenoh/azure…
simorenoh Feb 15, 2022
6476a74
updated param comments to mention integrated cache configuration
simorenoh Feb 15, 2022
88788ac
Merge branch 'main' into integrated-cache
simorenoh Feb 17, 2022
2d55627
moved to kwargs
simorenoh Feb 22, 2022
7fc9572
added tests to verify functionality
simorenoh Feb 23, 2022
b0df13f
Update test_integrated_cache.py
simorenoh Feb 23, 2022
257196f
Update test_integrated_cache.py
simorenoh Feb 23, 2022
1dc7554
updates to test to ensure it works with setup
simorenoh Feb 24, 2022
ae2923d
added headers test and new way to track client headers before sending…
simorenoh Feb 25, 2022
cadf7bc
Update test_integrated_cache.py
simorenoh Mar 4, 2022
01774cf
Create test_axq.py
simorenoh Mar 8, 2022
6363ce7
Added mocking tests for max integrated cache staleness. Fixed issue w…
kushagraThapar Mar 9, 2022
aa518be
Merge branch 'main' into integrated-cache
kushagraThapar Mar 9, 2022
0f24bed
upgrade version for release
simorenoh Mar 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
### 4.3.0b3 (Unreleased)

#### Features Added
- Added new **provisional** `max_integrated_cache_staleness_in_ms` parameter to read item and query items APIs in order
to make use of the **preview** CosmosDB integrated cache functionality.
Please see [Azure Cosmos DB integrated cache](https://docs.microsoft.com/azure/cosmos-db/integrated-cache) for more details.
- Added support for split-proof queries for the async client

### Bugs fixed
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
- Default consistency level for the sync and async clients is no longer `Session` and will instead be set to the
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`.
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency
will need to pass it explicitly if their account consistency is different than `Session`.
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
- Fixed invalid request body being sent when passing in `serverScript` body parameter to replace operations for trigger, sproc and udf resources.

### 4.3.0b2 (2022-01-25)
Expand Down
28 changes: 19 additions & 9 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
'query_version': 'queryVersion'
}


def _get_match_headers(kwargs):
# type: (Dict[str, Any]) -> Tuple(Optional[str], Optional[str])
if_match = kwargs.pop('if_match', None)
Expand Down Expand Up @@ -112,14 +113,14 @@ def build_options(kwargs):


def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
cosmos_client_connection,
default_headers,
verb,
path,
resource_id,
resource_type,
options,
partition_key_range_id=None,
cosmos_client_connection,
default_headers,
verb,
path,
resource_id,
resource_type,
options,
partition_key_range_id=None,
):
"""Gets HTTP request headers.

Expand Down Expand Up @@ -292,6 +293,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("populateQuotaInfo"):
headers[http_constants.HttpHeaders.PopulateQuotaInfo] = options["populateQuotaInfo"]

if options.get("maxIntegratedCacheStaleness"):
headers[http_constants.HttpHeaders.DedicatedGatewayCacheStaleness] = options["maxIntegratedCacheStaleness"]

return headers


Expand Down Expand Up @@ -638,7 +642,7 @@ def ParsePaths(paths):
newIndex += 1

# This will extract the token excluding the quote chars
token = path[currentIndex + 1 : newIndex]
token = path[currentIndex + 1: newIndex]
tokens.append(token)
currentIndex = newIndex + 1
else:
Expand All @@ -657,3 +661,9 @@ def ParsePaths(paths):
tokens.append(token)

return tokens


def validate_cache_staleness_value(max_integrated_cache_staleness):
int(max_integrated_cache_staleness) # Will throw error if data type cant be converted to int
if max_integrated_cache_staleness <= 0:
raise ValueError("Parameter 'max_integrated_cache_staleness_in_ms' can only be a positive integer.")
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ def __init__(
http_constants.HttpHeaders.IsContinuationExpected: False,
}

# Keeps the latest response headers from server.
# Keeps the latest request headers from the client.
self.last_request_headers = None
simorenoh marked this conversation as resolved.
Show resolved Hide resolved

# Keeps the latest response headers from the server.
self.last_response_headers = None

self._useMultipleWriteLocations = False
Expand Down Expand Up @@ -1874,6 +1877,7 @@ def ExecuteStoredProcedure(self, sproc_link, params, options=None, **kwargs):

# ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
request_params = _request_object.RequestObject("sprocs", documents._OperationType.ExecuteJavaScript)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Post(path, request_params, params, headers, **kwargs)
return result

Expand Down Expand Up @@ -2043,6 +2047,7 @@ def GetDatabaseAccount(self, url_connection=None, **kwargs):
headers = base.GetHeaders(self, initial_headers, "get", "", "", "", {}) # path # id # type

request_params = _request_object.RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Get("", request_params, headers, **kwargs)
database_account = documents.DatabaseAccount()
database_account.DatabasesLink = "/dbs/"
Expand Down Expand Up @@ -2097,6 +2102,7 @@ def Create(self, body, path, typ, id, initial_headers, options=None, **kwargs):
# Create will use WriteEndpoint since it uses POST operation

request_params = _request_object.RequestObject(typ, documents._OperationType.Create)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)

# update session for write request
Expand Down Expand Up @@ -2130,6 +2136,7 @@ def Upsert(self, body, path, typ, id, initial_headers, options=None, **kwargs):

# Upsert will use WriteEndpoint since it uses POST operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Upsert)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)
# update session for write request
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
Expand Down Expand Up @@ -2160,6 +2167,7 @@ def Replace(self, resource, path, typ, id, initial_headers, options=None,
headers = base.GetHeaders(self, initial_headers, "put", path, id, typ, options)
# Replace will use WriteEndpoint since it uses PUT operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Replace)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Put(path, request_params, resource, headers, **kwargs)

# update session for request mutates data on server side
Expand Down Expand Up @@ -2189,6 +2197,7 @@ def Read(self, path, typ, id, initial_headers, options=None, **kwargs): # pylin
headers = base.GetHeaders(self, initial_headers, "get", path, id, typ, options)
# Read will use ReadEndpoint since it uses GET operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Read)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Get(path, request_params, headers, **kwargs)
return result

Expand Down Expand Up @@ -2216,6 +2225,7 @@ def DeleteResource(self, path, typ, id, initial_headers, options=None,
headers = base.GetHeaders(self, initial_headers, "delete", path, id, typ, options)
# Delete will use WriteEndpoint since it uses DELETE operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Delete)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Delete(path, request_params, headers, **kwargs)

# update session for request mutates data on server side
Expand Down Expand Up @@ -2413,6 +2423,7 @@ def __GetBodiesFromQueryResult(result):
request_params = _request_object.RequestObject(
typ, documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Get(path, request_params, headers, **kwargs)
if response_hook:
response_hook(self.last_response_headers, result)
Expand All @@ -2436,8 +2447,9 @@ def __GetBodiesFromQueryResult(result):

# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request_params = _request_object.RequestObject(typ, documents._OperationType.SqlQuery)
req_headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Post(path, request_params, query, req_headers, **kwargs)
headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)
self.last_request_headers = headers.copy()
result, self.last_response_headers = self.__Post(path, request_params, query, headers, **kwargs)

if response_hook:
response_hook(self.last_response_headers, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ def __init__(
if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

# Keeps the latest response headers from server.
# Keeps the latest request headers from the client.
self.last_request_headers = None

# Keeps the latest response headers from the server.
self.last_response_headers = None

self._useMultipleWriteLocations = False
Expand Down Expand Up @@ -326,6 +329,7 @@ async def GetDatabaseAccount(self, url_connection=None, **kwargs):
headers = base.GetHeaders(self, initial_headers, "get", "", "", "", {}) # path # id # type

request_params = _request_object.RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Get("", request_params, headers, **kwargs)
database_account = documents.DatabaseAccount()
database_account.DatabasesLink = "/dbs/"
Expand Down Expand Up @@ -575,6 +579,7 @@ async def ExecuteStoredProcedure(self, sproc_link, params, options=None, **kwarg

# ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
request_params = _request_object.RequestObject("sprocs", documents._OperationType.ExecuteJavaScript)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Post(path, request_params, params, headers, **kwargs)
return result

Expand Down Expand Up @@ -603,6 +608,7 @@ async def Create(self, body, path, typ, id, initial_headers, options=None, **kwa
# Create will use WriteEndpoint since it uses POST operation

request_params = _request_object.RequestObject(typ, documents._OperationType.Create)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Post(path, request_params, body, headers, **kwargs)

# update session for write request
Expand Down Expand Up @@ -717,6 +723,7 @@ async def Upsert(self, body, path, typ, id, initial_headers, options=None, **kwa

# Upsert will use WriteEndpoint since it uses POST operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Upsert)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Post(path, request_params, body, headers, **kwargs)
# update session for write request
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
Expand Down Expand Up @@ -957,6 +964,7 @@ async def Read(self, path, typ, id, initial_headers, options=None, **kwargs): #
headers = base.GetHeaders(self, initial_headers, "get", path, id, typ, options)
# Read will use ReadEndpoint since it uses GET operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Read)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Get(path, request_params, headers, **kwargs)
return result

Expand Down Expand Up @@ -1219,6 +1227,7 @@ async def Replace(self, resource, path, typ, id, initial_headers, options=None,
headers = base.GetHeaders(self, initial_headers, "put", path, id, typ, options)
# Replace will use WriteEndpoint since it uses PUT operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Replace)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Put(path, request_params, resource, headers, **kwargs)

# update session for request mutates data on server side
Expand Down Expand Up @@ -1463,6 +1472,7 @@ async def DeleteResource(self, path, typ, id, initial_headers, options=None, **k
headers = base.GetHeaders(self, initial_headers, "delete", path, id, typ, options)
# Delete will use WriteEndpoint since it uses DELETE operation
request_params = _request_object.RequestObject(typ, documents._OperationType.Delete)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Delete(path, request_params, headers, **kwargs)

# update session for request mutates data on server side
Expand Down Expand Up @@ -2253,6 +2263,7 @@ def __GetBodiesFromQueryResult(result):
request_params = _request_object.RequestObject(typ,
documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Get(path, request_params, headers, **kwargs)
if response_hook:
response_hook(self.last_response_headers, result)
Expand All @@ -2276,8 +2287,9 @@ def __GetBodiesFromQueryResult(result):

# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request_params = _request_object.RequestObject(typ, documents._OperationType.SqlQuery)
req_headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = await self.__Post(path, request_params, query, req_headers, **kwargs)
headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)
self.last_request_headers = headers.copy()
result, self.last_response_headers = await self.__Post(path, request_params, query, headers, **kwargs)

if response_hook:
response_hook(self.last_response_headers, result)
Expand Down
Loading