Skip to content

Commit

Permalink
Query pipeline changes and Offset/Limit/Distinct support (#6770)
Browse files Browse the repository at this point in the history
* intiial commit for query piepline changes

* initial commit for offset and limit

* modified aggregate tests to check for top as well after bugfix

* Added support for distinct

* modified aggregate tests to run in mono repo

* fixed failing tests and bugs

* updated tests

* fixed hashing problem for distinct

* fixed bug in distinct queries

* replaced single quotes with double quotes

* re introduced hashing via sha1

* fixed bug in distinct for py3

* dummy commit

* dummy commit

* [Cosmos] Core pipeline integration (#6961)

* Updated dependencies

* Added core pipeline

* Ignore test config

* Fixed indexes test

* Refactored request creation

* Fixed index test

* Added trace decorators

* Bumped version

* Updated policies

* Renamed request_options -> request_params

* [Cosmos] Applying track 2 SDK guidelines (#7021)

* Updated dependencies

* Added core pipeline

* Ignore test config

* Fixed indexes test

* Refactored request creation

* Fixed index test

* Added trace decorators

* Bumped version

* Updated policies

* Renamed request_options -> request_params

* Renamed clients

* Updated with azure-core errors

* Fixed test warnings

* Updated config

* PR fixes

* Fixed init user import

* Fixed init clients

* Started revising constructors

* Test conn str constructor

* Update iterables with core paging

* Added context managers

* Reverted storage changes

* Updated constructor

* Mypy and Pylint

* Renamed all listing operations

* Some mypy fixes

* Cleaned up method signatures

* Fix pylint

* Propagate kwargs

* Fix pylint

* Some mypy fixes

* Updated readme and release notes

* Fix for passing in extra headers

* Reverted credentials

* Review feedback

* Fix pylint

* Fixed samples

* Updated docstrings

* Fixed whitespace and imports

* Some mypy fixes

* Mypy fixes

* Removed continuation token support

* Pylint fix

* Docs tweaks

* Updated continuation token

* Updated response header

* Added support for Urllib3 Connection retries

Made Offer extend object instead of dict

* [Cosmos] Bumped dependency (#7147)

* Bumped dependency

* Update reqs

* Misc fixes for Cosmos SDK  (#7157)

* Made offer extend object instead of dict

* added support for urllib3 connection retry

* [Cosmos] Reconfigure retry policy (#7544)

* Reconfigure retry policy

* Review feedback

* Fix pylint

* Updated tests

* Support client-side timeout

* Updated timeout logic

* Renamed client error

* Updated tests

* Patch azure-core

Needed pending PR 7542

* Fixed status retry tests

* Using dev core

* [Cosmos] Docs updates (#7626)

* Updated sample refs

* Added release notes

* Remove old rst files

* Fixed kwarg formatting

* add sdk tools repo (#7656)

* [Cosmos] More docs cleanup (#7661)

* Removed old docs

* Bumped core version

* Removed submodule docs

* Fixed imports

* Pylint fix

* More docs updates

* Fixed docstring types

* Docstring formatting

* Updated snippet references

* Fixed exception docs

* More exception docstrings

* Review feedback

* dummy commit

* reverted dummy commit

* fixed failing test

fixed lint errors

* fixed failing tests

* updated comment

* added **kwargs to _GetQueryPlanThroughGateway
  • Loading branch information
Srinath Narayanan authored and annatisch committed Oct 11, 2019
1 parent 37cc208 commit fcc9d85
Show file tree
Hide file tree
Showing 16 changed files with 767 additions and 300 deletions.
12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,18 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("offerThroughput"):
headers[http_constants.HttpHeaders.OfferThroughput] = options["offerThroughput"]

if options.get("contentType"):
headers[http_constants.HttpHeaders.ContentType] = options['contentType']

if options.get("isQueryPlanRequest"):
headers[http_constants.HttpHeaders.IsQueryPlanRequest] = options['isQueryPlanRequest']

if options.get("supportedQueryFeatures"):
headers[http_constants.HttpHeaders.SupportedQueryFeatures] = options['supportedQueryFeatures']

if options.get("queryVersion"):
headers[http_constants.HttpHeaders.QueryVersion] = options['queryVersion']

if "partitionKey" in options:
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
if options.get("partitionKey") is partition_key._Undefined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,7 @@ def __QueryFeed(
options=None,
partition_key_range_id=None,
response_hook=None,
is_query_plan=False,
**kwargs
):
"""Query for more than one Azure Cosmos resources.
Expand All @@ -2639,6 +2640,9 @@ def __QueryFeed(
The request options for the request.
:param str partition_key_range_id:
Specifies partition key range id.
:param function response_hook:
:param bool is_query_plan:
Specififes if the call is to fetch query plan
:rtype:
list
Expand All @@ -2664,7 +2668,8 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request_params = _request_object.RequestObject(typ, documents._OperationType.ReadFeed)
request_params = _request_object.RequestObject(typ,
documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Get(path, request_params, headers, **kwargs)
if response_hook:
Expand All @@ -2674,6 +2679,9 @@ def __GetBodiesFromQueryResult(result):
query = self.__CheckAndUnifyQueryFormat(query)

initial_headers[http_constants.HttpHeaders.IsQuery] = "true"
if not is_query_plan:
initial_headers[http_constants.HttpHeaders.IsQuery] = "true"

if (
self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Default
or self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Query
Expand All @@ -2694,6 +2702,36 @@ def __GetBodiesFromQueryResult(result):

return __GetBodiesFromQueryResult(result)

def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
documents._QueryFeature.Distinct + "," +
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)

options = {
"contentType": runtime_constants.MediaTypes.Json,
"isQueryPlanRequest": True,
"supportedQueryFeatures": supported_query_features,
"queryVersion": http_constants.Versions.QueryVersion
}

resource_link = base.TrimBeginningAndEndingSlashes(resource_link)
path = base.GetPathFromLink(resource_link, "docs")
resource_id = base.GetResourceIdOrFullNameFromLink(resource_link)

return self.__QueryFeed(path,
"docs",
resource_id,
lambda r: r,
None,
query,
options,
is_query_plan=True,
**kwargs)

def __CheckAndUnifyQueryFormat(self, query_body):
"""Checks and unifies the format of the query body.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def __init__(self, *args):
def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if self.args:
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers):
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers) \
or (http_constants.HttpHeaders.IsQueryPlanRequest in self.args[3].headers):
return True
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from collections import deque
from .. import _retry_utility
from .. import http_constants
from .. import _base

# pylint: disable=protected-access

Expand Down Expand Up @@ -171,100 +170,3 @@ def __init__(self, client, options, fetch_function):
def _fetch_next_block(self):
while super(_DefaultQueryExecutionContext, self)._has_more_pages() and not self._buffer:
return self._fetch_items_helper_with_retries(self._fetch_function)


class _MultiCollectionQueryExecutionContext(_QueryExecutionContextBase):
"""
This class is used if it is client side partitioning
"""

def __init__(self, client, options, database_link, query, partition_key):
"""
Constructor
:param CosmosClient client:
:param dict options:
The request options for the request.
:param str database_link: database self link or ID based link
:param (str or dict) query:
Partition_key (str): partition key for the query
"""
super(_MultiCollectionQueryExecutionContext, self).__init__(client, options)

self._current_collection_index = 0
self._collection_links = []
self._collection_links_length = 0

self._query = query
self._client = client

partition_resolver = client.GetPartitionResolver(database_link)

if partition_resolver is None:
raise ValueError(client.PartitionResolverErrorMessage)

self._collection_links = partition_resolver.ResolveForRead(partition_key)

self._collection_links_length = len(self._collection_links)

if self._collection_links is None:
raise ValueError("_collection_links is None.")

if self._collection_links_length <= 0:
raise ValueError("_collection_links_length is not greater than 0.")

# Creating the QueryFeed for the first collection
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._current_collection_index += 1

def fetch_fn(options):
return client.QueryFeed(path, collection_id, query, options)

self._fetch_function = fetch_fn

def _has_more_pages(self):
return (
not self._has_started
or self._continuation
or (self._collection_links and self._current_collection_index < self._collection_links_length)
)

def _fetch_next_block(self):
"""Fetches the next block of query results.
This iterates fetches the next block of results from the current collection link.
Once the current collection results were exhausted. It moves to the next collection link.
:return:
List of fetched items.
:rtype: list
"""
# Fetch next block of results by executing the query against the current document collection
fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)

# If there are multiple document collections to query for(in case of partitioning),
# keep looping through each one of them, creating separate feed queries for each
# collection and fetching the items
while not fetched_items:
if self._collection_links and self._current_collection_index < self._collection_links_length:
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(
self._collection_links[self._current_collection_index]
)

self._continuation = None
self._has_started = False

def fetch_fn(options):
return self._client.QueryFeed(path, collection_id, self._query, options)

self._fetch_function = fetch_fn

fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)
self._current_collection_index += 1
else:
break

return fetched_items
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
"""Internal class for query execution endpoint component implementation in the Azure Cosmos database service.
"""
import numbers
import copy
import hashlib
import json
import six

from azure.cosmos._execution_context.aggregators import (
_AverageAggregator,
Expand Down Expand Up @@ -75,6 +79,86 @@ def next(self):
raise StopIteration


class _QueryExecutionDistinctOrderedEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling distinct query.
It returns only those values not already returned.
"""
def __init__(self, execution_context):
super(_QueryExecutionDistinctOrderedEndpointComponent, self).__init__(execution_context)
self.last_result = None

def next(self):
res = next(self._execution_context)
while self.last_result == res:
res = next(self._execution_context)
self.last_result = res
return res


class _QueryExecutionDistinctUnorderedEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling distinct query.
It returns only those values not already returned.
"""
def __init__(self, execution_context):
super(_QueryExecutionDistinctUnorderedEndpointComponent, self).__init__(execution_context)
self.last_result = set()

def make_hash(self, value):
if isinstance(value, (set, tuple, list)):
return tuple([self.make_hash(v) for v in value])
if not isinstance(value, dict):
if isinstance(value, numbers.Number):
return float(value)
return value
new_value = copy.deepcopy(value)
for k, v in new_value.items():
new_value[k] = self.make_hash(v)

return tuple(frozenset(sorted(new_value.items())))

def next(self):
res = next(self._execution_context)

json_repr = json.dumps(self.make_hash(res))
if six.PY3:
json_repr = json_repr.encode("utf-8")

hash_object = hashlib.sha1(json_repr)
hashed_result = hash_object.hexdigest()

while hashed_result in self.last_result:
res = next(self._execution_context)
json_repr = json.dumps(self.make_hash(res))
if six.PY3:
json_repr = json_repr.encode("utf-8")

hash_object = hashlib.sha1(json_repr)
hashed_result = hash_object.hexdigest()
self.last_result.add(hashed_result)
return res


class _QueryExecutionOffsetEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling offset query.
It returns results offset by as many results as offset arg specified.
"""
def __init__(self, execution_context, offset_count):
super(_QueryExecutionOffsetEndpointComponent, self).__init__(execution_context)
self._offset_count = offset_count

def next(self):
while self._offset_count > 0:
res = next(self._execution_context)
if res is not None:
self._offset_count -= 1
else:
raise StopIteration
return next(self._execution_context)


class _QueryExecutionAggregateEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling aggregate query.
Expand Down
Loading

0 comments on commit fcc9d85

Please sign in to comment.