Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query pipeline changes and Offset/Limit/Distinct support #6770

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2e27f01
intiial commit for query piepline changes
Aug 12, 2019
3d90acb
initial commit for offset and limit
Aug 13, 2019
bab7e2c
modified aggregate tests to check for top as well after bugfix
Aug 13, 2019
1bc1213
Added support for distinct
Aug 15, 2019
34729b7
modified aggregate tests to run in mono repo
Aug 15, 2019
2a25ad8
fixed failing tests and bugs
Aug 16, 2019
a88ff22
updated tests
Aug 16, 2019
57c250d
fixed hashing problem for distinct
Aug 26, 2019
2b0d90f
fixed bug in distinct queries
Aug 27, 2019
d72b8ff
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-pytho…
Aug 28, 2019
875e55b
replaced single quotes with double quotes
Aug 28, 2019
fb40ea9
re introduced hashing via sha1
Aug 29, 2019
0db6371
fixed bug in distinct for py3
Aug 29, 2019
593e863
Merge remote-tracking branch 'github-azure-sdk-master/master' into sr…
Aug 29, 2019
9e33c16
dummy commit
Aug 29, 2019
f0e356a
dummy commit
Aug 29, 2019
73af86a
[Cosmos] Core pipeline integration (#6961)
annatisch Aug 29, 2019
3bed8a0
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 4, 2019
b25b64c
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 4, 2019
9b05a6b
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 5, 2019
dc7d9e8
[Cosmos] Applying track 2 SDK guidelines (#7021)
annatisch Sep 9, 2019
dc0836e
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 9, 2019
577c122
Added support for Urllib3 Connection retries
Sep 9, 2019
7bc395b
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 10, 2019
b5b4f8b
[Cosmos] Bumped dependency (#7147)
annatisch Sep 10, 2019
700f5db
Misc fixes for Cosmos SDK (#7157)
Sep 10, 2019
efd178d
resolved megre conflicts
Sep 10, 2019
4cd7f37
Merge branch 'feature/cosmos-preview4' into srnara/queryPipeline
annatisch Sep 26, 2019
962391c
[Cosmos] Reconfigure retry policy (#7544)
annatisch Oct 4, 2019
6b70f06
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 4, 2019
dc9ca57
[Cosmos] Docs updates (#7626)
annatisch Oct 4, 2019
96ad56a
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 7, 2019
ad54c4f
add sdk tools repo (#7656)
danieljurek Oct 7, 2019
c0d0967
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 7, 2019
81d96ce
[Cosmos] More docs cleanup (#7661)
annatisch Oct 8, 2019
dcbbacf
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 8, 2019
dcd9276
dummy commit
Oct 9, 2019
12474a1
reverted dummy commit
Oct 9, 2019
1a8230f
Merge branch 'feature/cosmos-preview4' into srnara/queryPipeline
Oct 9, 2019
e258bc8
Merge remote-tracking branch 'azure-sdk-main/master' into srnara/quer…
Oct 9, 2019
02da658
fixed failing test
Oct 10, 2019
59fa49b
fixed failing tests
Oct 10, 2019
dcc1b5c
updated comment
Oct 10, 2019
83664b1
added **kwargs to _GetQueryPlanThroughGateway
Oct 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ def GetHeaders(cosmos_client_connection,
if options.get('offerThroughput'):
headers[http_constants.HttpHeaders.OfferThroughput] = options['offerThroughput']

if options.get('contentType'):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
headers[http_constants.HttpHeaders.ContentType] = options['contentType']

if options.get('isQueryPlanRequest'):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
headers[http_constants.HttpHeaders.IsQueryPlanRequest] = options['isQueryPlanRequest']
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved

if options.get('supportedQueryFeatures'):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
headers[http_constants.HttpHeaders.SupportedQueryFeatures] = options['supportedQueryFeatures']

if options.get('queryVersion'):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
headers[http_constants.HttpHeaders.QueryVersion] = options['queryVersion']

if 'partitionKey' in options:
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
if options.get('partitionKey') is partition_key._Undefined:
Expand Down
65 changes: 48 additions & 17 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'dbs')

def ReadContainers(self, database_link, options=None):
"""Reads all collections in a database.
Expand Down Expand Up @@ -341,7 +341,7 @@ def fetch_fn(options):
lambda _, body: body,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'colls')

def CreateContainer(self, database_link, collection, options=None):
"""Creates a collection in a database.
Expand Down Expand Up @@ -550,7 +550,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'users')

def DeleteDatabase(self, database_link, options=None):
"""Deletes a database.
Expand Down Expand Up @@ -710,7 +710,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'permissions')

def ReplaceUser(self, user_link, user, options=None):
"""Replaces a user and return it.
Expand Down Expand Up @@ -875,7 +875,7 @@ def fetch_fn(options):
query,
options,
response_hook=response_hook), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn, database_or_Container_link)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'docs', database_or_Container_link)

def QueryItemsChangeFeed(self, collection_link, options=None, response_hook=None):
"""Queries documents change feed in a collection.
Expand Down Expand Up @@ -944,7 +944,7 @@ def fetch_fn(options):
options,
partition_key_range_id,
response_hook=response_hook), self.last_response_headers
return query_iterable.QueryIterable(self, None, options, fetch_fn, collection_link)
return query_iterable.QueryIterable(self, None, options, fetch_fn, resource_key, collection_link)

def _ReadPartitionKeyRanges(self, collection_link, feed_options=None):
"""Reads Partition Key Ranges.
Expand Down Expand Up @@ -992,7 +992,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'pkranges')

def CreateItem(self, database_or_Container_link, document, options=None):
"""Creates a document in a collection.
Expand Down Expand Up @@ -1173,7 +1173,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'triggers')

def CreateTrigger(self, collection_link, trigger, options=None):
"""Creates a trigger in a collection.
Expand Down Expand Up @@ -1308,7 +1308,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'udfs')

def CreateUserDefinedFunction(self, collection_link, udf, options=None):
"""Creates a user defined function in a collection.
Expand Down Expand Up @@ -1443,7 +1443,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'sprocs')

def CreateStoredProcedure(self, collection_link, sproc, options=None):
"""Creates a stored procedure in a collection.
Expand Down Expand Up @@ -1576,7 +1576,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'conflicts')

def ReadConflict(self, conflict_link, options=None):
"""Reads a conflict.
Expand Down Expand Up @@ -1899,7 +1899,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'attachments')


def ReadMedia(self, media_link):
Expand Down Expand Up @@ -2366,7 +2366,7 @@ def fetch_fn(options):
lambda _, b: b,
query,
options), self.last_response_headers
return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, 'offers')

def GetDatabaseAccount(self, url_connection=None):
"""Gets database account info.
Expand Down Expand Up @@ -2744,7 +2744,8 @@ def __QueryFeed(self,
query,
options=None,
partition_key_range_id=None,
response_hook=None):
response_hook=None,
is_query_plan=False):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
"""Query for more than one Azure Cosmos resources.

:param str path:
Expand Down Expand Up @@ -2783,7 +2784,7 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request = _request_object.RequestObject(type, documents._OperationType.ReadFeed)
request = _request_object.RequestObject(type, documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self,
initial_headers,
'get',
Expand All @@ -2801,7 +2802,8 @@ def __GetBodiesFromQueryResult(result):
else:
query = self.__CheckAndUnifyQueryFormat(query)

initial_headers[http_constants.HttpHeaders.IsQuery] = 'true'
if not is_query_plan:
initial_headers[http_constants.HttpHeaders.IsQuery] = 'true'
if (self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Default or
self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Query):
initial_headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.QueryJson
Expand All @@ -2811,7 +2813,7 @@ def __GetBodiesFromQueryResult(result):
raise SystemError('Unexpected query compatibility mode.')

# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request = _request_object.RequestObject(type, documents._OperationType.SqlQuery)
request = _request_object.RequestObject(type, documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.SqlQuery)
headers = base.GetHeaders(self,
initial_headers,
'post',
Expand All @@ -2831,6 +2833,35 @@ def __GetBodiesFromQueryResult(result):

return __GetBodiesFromQueryResult(result)

def _GetQueryPlanThroughGateway(self, query, resource_link):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
documents._QueryFeature.Distinct + "," +
documents._QueryFeature.MultipleOrderBy + "," +
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)

options = {
'contentType': runtime_constants.MediaTypes.Json,
'isQueryPlanRequest': True,
'supportedQueryFeatures': supported_query_features,
'queryVersion': http_constants.Versions.QueryVersion
}

resource_link = base.TrimBeginningAndEndingSlashes(resource_link)
path = base.GetPathFromLink(resource_link, 'docs')
resource_id = base.GetResourceIdOrFullNameFromLink(resource_link)

return self.__QueryFeed(path,
'docs',
resource_id,
lambda r: r,
None,
query,
options,
is_query_plan=True)
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved

def __CheckAndUnifyQueryFormat(self, query_body):
"""Checks and unifies the format of the query body.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def __init__(self, *args):
def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if (len(self.args) > 0):
if (self.args[4]['method'] == 'GET') or (http_constants.HttpHeaders.IsQuery in self.args[4]['headers']):
if ((self.args[4]['method'] == 'GET') or (http_constants.HttpHeaders.IsQuery in self.args[4]['headers'])\
or (http_constants.HttpHeaders.IsQueryPlanRequest in self.args[4]['headers'])):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
return True
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,97 +167,5 @@ def __init__(self, client, options, fetch_function):
def _fetch_next_block(self):
while super(_DefaultQueryExecutionContext, self)._has_more_pages() and len(self._buffer) == 0:
return self._fetch_items_helper_with_retries(self._fetch_function)

class _MultiCollectionQueryExecutionContext(_QueryExecutionContextBase):
"""
This class is used if it is client side partitioning
"""
def __init__(self, client, options, database_link, query, partition_key):
"""
Constructor
:param CosmosClient client:
:param dict options:
The request options for the request.
:param str database_link: database self link or ID based link
:param (str or dict) query:
Partition_key (str): partition key for the query

"""
super(_MultiCollectionQueryExecutionContext, self).__init__(client, options)

self._current_collection_index = 0
self._collection_links = []
self._collection_links_length = 0

self._query = query
self._client = client

partition_resolver = client.GetPartitionResolver(database_link)

if(partition_resolver is None):
raise ValueError(client.PartitionResolverErrorMessage)
else:
self._collection_links = partition_resolver.ResolveForRead(partition_key)

self._collection_links_length = len(self._collection_links)

if self._collection_links is None:
raise ValueError("_collection_links is None.")

if self._collection_links_length <= 0:
raise ValueError("_collection_links_length is not greater than 0.")

# Creating the QueryFeed for the first collection
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._current_collection_index += 1

def fetch_fn(options):
return client.QueryFeed(path,
collection_id,
query,
options)

self._fetch_function = fetch_fn

def _has_more_pages(self):
return not self._has_started or self._continuation or (self._collection_links and self._current_collection_index < self._collection_links_length)

def _fetch_next_block(self):
"""Fetches the next block of query results.

This iterates fetches the next block of results from the current collection link.
Once the current collection results were exhausted. It moves to the next collection link.

:return:
List of fetched items.
:rtype: list
"""
# Fetch next block of results by executing the query against the current document collection
fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)

# If there are multiple document collections to query for(in case of partitioning), keep looping through each one of them,
# creating separate feed queries for each collection and fetching the items
while not fetched_items:
if self._collection_links and self._current_collection_index < self._collection_links_length:
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._continuation = None
self._has_started = False

def fetch_fn(options):
return self._client.QueryFeed(path,
collection_id,
self._query,
options)

self._fetch_function = fetch_fn

fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)
self._current_collection_index += 1
else:
break

return fetched_items
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""Internal class for query execution endpoint component implementation in the Azure Cosmos database service.
"""
import numbers
import copy

from azure.cosmos._execution_context.aggregators import _AverageAggregator, _CountAggregator, _MaxAggregator, \
_MinAggregator, _SumAggregator
Expand Down Expand Up @@ -68,6 +69,72 @@ def next(self):
return res
raise StopIteration


class _QueryExecutionDistinctOrderedEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling distinct query.

It returns only those values not already returned.
"""
def __init__(self, execution_context):
super(_QueryExecutionDistinctOrderedEndpointComponent, self).__init__(execution_context)
self.last_result = None

def next(self):
res = next(self._execution_context)
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
while self.last_result == res:
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
res = next(self._execution_context)
self.last_result = res
return res


class _QueryExecutionDistinctUnorderedEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling distinct query.
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved

It returns only those values not already returned.
"""
def __init__(self, execution_context):
super(_QueryExecutionDistinctUnorderedEndpointComponent, self).__init__(execution_context)
self.last_result = set()

def make_hash(self, value):
if isinstance(value, (set, tuple, list)):
return tuple([self.make_hash(v) for v in value])
elif not isinstance(value, dict):
return hash(value)
new_value = copy.deepcopy(value)
for k, v in new_value.items():
new_value[k] = self.make_hash(v)

return hash(tuple(frozenset(sorted(new_value.items()))))

def next(self):
res = next(self._execution_context)
hashed_result = self.make_hash(res)
while hashed_result in self.last_result:
res = next(self._execution_context)
self.last_result.add(self.make_hash(res))
return res


class _QueryExecutionOffsetEndpointComponent(_QueryExecutionEndpointComponent):
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
"""Represents an endpoint in handling offset query.

It returns results offset by as many results as offset arg specified.
"""
def __init__(self, execution_context, offset_count):
super(_QueryExecutionOffsetEndpointComponent, self).__init__(execution_context)
self._offset_count = offset_count

def next(self):
while self._offset_count > 0:
res = next(self._execution_context)
if res is not None:
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
self._offset_count -= 1
else:
raise StopIteration
return next(self._execution_context)


class _QueryExecutionAggregateEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling aggregate query.

Expand Down
Loading