Skip to content

Commit

Permalink
Make private Cosmos modules private [WIP] (#6329)
Browse files Browse the repository at this point in the history
* make consistent_hash_ring private

* make default_retry_policy private

* make endpoint_discovery_retry_policy private

* make hash_partition_resolver private

* make location_cache private

* make murmur_hash private

* make range private

* make range_partition_resolver private

* make vector_session_token private

* make resource_throttle_retry_policy private

* make retry_utility private

* make utils private

* make routing private

* make execution_context private

* make cosmos_client_connection private

* make retry_options private

* make query_iterable private

* make constants private

* make synchronized_request private

* make session_retry_policy private

* make partition private

* make global_endpoint_manager private

* make runtime_constants private

* make session private

* make request_object private

* make base private
  • Loading branch information
bryevdv authored Jul 31, 2019
1 parent 37c46c6 commit c6ebc93
Show file tree
Hide file tree
Showing 61 changed files with 344 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from . import documents
from . import partition_key
from . import http_constants
from . import constants
from . import runtime_constants
from . import _constants as constants
from . import _runtime_constants

import six
from six.moves.urllib.parse import quote as urllib_quote
Expand Down Expand Up @@ -178,10 +178,10 @@ def GetHeaders(cosmos_client_connection,

if verb == 'post' or verb == 'put':
if not headers.get(http_constants.HttpHeaders.ContentType):
headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.Json
headers[http_constants.HttpHeaders.ContentType] = _runtime_constants.MediaTypes.Json

if not headers.get(http_constants.HttpHeaders.Accept):
headers[http_constants.HttpHeaders.Accept] = runtime_constants.MediaTypes.Json
headers[http_constants.HttpHeaders.Accept] = _runtime_constants.MediaTypes.Json

if partition_key_range_id is not None:
headers[http_constants.HttpHeaders.PartitionKeyRangeID] = partition_key_range_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import six
from six.moves import xrange

from . import partition
from . import _partition


class _ConsistentHashRing(object):
class ConsistentHashRing(object):
"""The ConsistentHashRing class implements a consistent hash ring using the
hash generator specified.
"""
Expand Down Expand Up @@ -79,13 +79,13 @@ def _ConstructPartitions(self, collection_links, partitions_per_node):
using the hashing algorithm and then finally sorting the partitions based on the hash value.
"""
collections_node_count = len(collection_links)
partitions = [partition._Partition() for _ in xrange(0, partitions_per_node * collections_node_count)]
partitions = [_partition.Partition() for _ in xrange(0, partitions_per_node * collections_node_count)]

index = 0
for collection_node in collection_links:
hash_value = self.hash_generator.ComputeHash(self._GetBytes(collection_node))
for _ in xrange(0, partitions_per_node):
partitions[index] = partition._Partition(hash_value, collection_node)
partitions[index] = _partition.Partition(hash_value, collection_node)
index += 1
hash_value = self.hash_generator.ComputeHash(hash_value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@

import six
from typing import cast
from . import base
from . import _base as base
from . import documents
from . import constants
from . import _constants as constants
from . import http_constants
from . import query_iterable
from . import runtime_constants
from . import request_object
from . import synchronized_request
from . import global_endpoint_manager
from .routing import routing_map_provider as routing_map_provider
from . import session
from . import utils
from . import _query_iterable as query_iterable
from . import _runtime_constants as runtime_constants
from . import _request_object
from . import _synchronized_request as synchronized_request
from . import _global_endpoint_manager as global_endpoint_manager
from ._routing import routing_map_provider as routing_map_provider
from . import _session
from . import _utils
from .partition_key import _Undefined, _Empty


Expand Down Expand Up @@ -110,7 +110,7 @@ def __init__(self,
http_constants.HttpHeaders.Version:
http_constants.Versions.CurrentVersion,
http_constants.HttpHeaders.UserAgent:
utils._get_user_agent(),
_utils.get_user_agent(),
# For single partition query with aggregate functions we would try to accumulate the results on the SDK.
# We need to set continuation as not expected.
http_constants.HttpHeaders.IsContinuationExpected: False
Expand All @@ -127,7 +127,7 @@ def __init__(self,
'''create a session - this is maintained only if the default consistency level
on the client is set to session, or if the user explicitly sets it as a property
via setter'''
self.session = session.Session(self.url_connection)
self.session = _session.Session(self.url_connection)
else:
self.session = None

Expand All @@ -150,7 +150,7 @@ def __init__(self,
self._query_compatibility_mode = CosmosClientConnection._QueryCompatibilityMode.Default

# Routing map provider
self._routing_map_provider = routing_map_provider._SmartRoutingMapProvider(self)
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)

database_account = self._global_endpoint_manager._GetDatabaseAccount()
self._global_endpoint_manager.force_refresh(database_account)
Expand Down Expand Up @@ -1932,7 +1932,7 @@ def ReadMedia(self, media_link):
{})

# ReadMedia will always use WriteEndpoint since it's not replicated in readable Geo regions
request = request_object._RequestObject('media', documents._OperationType.Read)
request = _request_object.RequestObject('media', documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path,
request,
headers)
Expand Down Expand Up @@ -1981,7 +1981,7 @@ def UpdateMedia(self, media_link, readable_stream, options=None):
options)

# UpdateMedia will use WriteEndpoint since it uses PUT operation
request = request_object._RequestObject('media', documents._OperationType.Update)
request = _request_object.RequestObject('media', documents._OperationType.Update)
result, self.last_response_headers = self.__Put(path,
request,
readable_stream,
Expand Down Expand Up @@ -2200,7 +2200,7 @@ def ExecuteStoredProcedure(self, sproc_link, params, options=None):
options)

# ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
request = request_object._RequestObject('sprocs', documents._OperationType.ExecuteJavaScript)
request = _request_object.RequestObject('sprocs', documents._OperationType.ExecuteJavaScript)
result, self.last_response_headers = self.__Post(path,
request,
params,
Expand Down Expand Up @@ -2389,7 +2389,7 @@ def GetDatabaseAccount(self, url_connection=None):
'', # type
{})

request = request_object._RequestObject('databaseaccount', documents._OperationType.Read, url_connection)
request = _request_object.RequestObject('databaseaccount', documents._OperationType.Read, url_connection)
result, self.last_response_headers = self.__Get('',
request,
headers)
Expand Down Expand Up @@ -2449,7 +2449,7 @@ def Create(self, body, path, type, id, initial_headers, options=None):
options)
# Create will use WriteEndpoint since it uses POST operation

request = request_object._RequestObject(type, documents._OperationType.Create)
request = _request_object.RequestObject(type, documents._OperationType.Create)
result, self.last_response_headers = self.__Post(path,
request,
body,
Expand Down Expand Up @@ -2491,7 +2491,7 @@ def Upsert(self, body, path, type, id, initial_headers, options=None):
headers[http_constants.HttpHeaders.IsUpsert] = True

# Upsert will use WriteEndpoint since it uses POST operation
request = request_object._RequestObject(type, documents._OperationType.Upsert)
request = _request_object.RequestObject(type, documents._OperationType.Upsert)
result, self.last_response_headers = self.__Post(path,
request,
body,
Expand Down Expand Up @@ -2529,7 +2529,7 @@ def Replace(self, resource, path, type, id, initial_headers, options=None):
type,
options)
# Replace will use WriteEndpoint since it uses PUT operation
request = request_object._RequestObject(type, documents._OperationType.Replace)
request = _request_object.RequestObject(type, documents._OperationType.Replace)
result, self.last_response_headers = self.__Put(path,
request,
resource,
Expand Down Expand Up @@ -2567,7 +2567,7 @@ def Read(self, path, type, id, initial_headers, options=None):
type,
options)
# Read will use ReadEndpoint since it uses GET operation
request = request_object._RequestObject(type, documents._OperationType.Read)
request = _request_object.RequestObject(type, documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path,
request,
headers)
Expand Down Expand Up @@ -2601,7 +2601,7 @@ def DeleteResource(self, path, type, id, initial_headers, options=None):
type,
options)
# Delete will use WriteEndpoint since it uses DELETE operation
request = request_object._RequestObject(type, documents._OperationType.Delete)
request = _request_object.RequestObject(type, documents._OperationType.Delete)
result, self.last_response_headers = self.__Delete(path,
request,
headers)
Expand Down Expand Up @@ -2783,7 +2783,7 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request = request_object._RequestObject(type, documents._OperationType.ReadFeed)
request = _request_object.RequestObject(type, documents._OperationType.ReadFeed)
headers = base.GetHeaders(self,
initial_headers,
'get',
Expand Down Expand Up @@ -2811,7 +2811,7 @@ def __GetBodiesFromQueryResult(result):
raise SystemError('Unexpected query compatibility mode.')

# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request = request_object._RequestObject(type, documents._OperationType.SqlQuery)
request = _request_object.RequestObject(type, documents._OperationType.SqlQuery)
headers = base.GetHeaders(self,
initial_headers,
'post',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
from . import http_constants

class _DefaultRetryPolicy(object):
class DefaultRetryPolicy(object):

error_codes = http_constants._ErrorCodes
CONNECTION_ERROR_CODES = [
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(self, *args):
self.args = args

def needsRetry(self, error_code):
if error_code in _DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if (len(self.args) > 0):
if (self.args[4]['method'] == 'GET') or (http_constants.HttpHeaders.IsQuery in self.args[4]['headers']):
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger.addHandler(log_handler)


class _EndpointDiscoveryRetryPolicy(object):
class EndpointDiscoveryRetryPolicy(object):
"""The endpoint discovery retry policy class used for geo-replicated database accounts
to handle the write forbidden exceptions due to writable/readable location changes
(say, after a failover).
Expand All @@ -44,9 +44,9 @@ class _EndpointDiscoveryRetryPolicy(object):

def __init__(self, connection_policy, global_endpoint_manager, *args):
self.global_endpoint_manager = global_endpoint_manager
self._max_retry_attempt_count = _EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self.failover_retry_count = 0
self.retry_after_in_milliseconds = _EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
self.connection_policy = connection_policy
self.request = args[0] if args else None
#clear previous location-based routing directive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""Internal class for aggregation queries implementation in the Azure Cosmos database service.
"""
from abc import abstractmethod, ABCMeta
from azure.cosmos.execution_context.document_producer import _OrderByHelper
from azure.cosmos._execution_context.document_producer import _OrderByHelper


class _Aggregator(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"""

from collections import deque
from .. import retry_utility
from .. import _retry_utility
from .. import http_constants
from .. import base
from .. import _base

class _QueryExecutionContextBase(object):
"""
Expand Down Expand Up @@ -140,7 +140,7 @@ def _fetch_items_helper_with_retries(self, fetch_function):
def callback():
return self._fetch_items_helper_no_retries(fetch_function)

return retry_utility._Execute(self._client, self._client._global_endpoint_manager, callback)
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)


class _DefaultQueryExecutionContext(_QueryExecutionContextBase):
Expand Down Expand Up @@ -208,8 +208,8 @@ def __init__(self, client, options, database_link, query, partition_key):
raise ValueError("_collection_links_length is not greater than 0.")

# Creating the QueryFeed for the first collection
path = base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._current_collection_index += 1

Expand Down Expand Up @@ -241,8 +241,8 @@ def _fetch_next_block(self):
# creating separate feed queries for each collection and fetching the items
while not fetched_items:
if self._collection_links and self._current_collection_index < self._collection_links_length:
path = base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], 'docs')
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._continuation = None
self._has_started = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import six
import numbers
from collections import deque
from azure.cosmos import base
from azure.cosmos.execution_context.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos import _base
from azure.cosmos._execution_context.base_execution_context import _DefaultQueryExecutionContext
from six.moves import xrange

class _DocumentProducer(object):
Expand All @@ -51,8 +51,8 @@ def __init__(self, partition_key_target_range, client, collection_link, query, d
self._cur_item = None
# initiate execution context

path = base.GetPathFromLink(collection_link, 'docs')
collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
path = _base.GetPathFromLink(collection_link, 'docs')
collection_id = _base.GetResourceIdOrFullNameFromLink(collection_link)
def fetch_fn(options):
return self._client.QueryFeed(path,
collection_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
import numbers

from azure.cosmos.execution_context.aggregators import _AverageAggregator, _CountAggregator, _MaxAggregator, \
from azure.cosmos._execution_context.aggregators import _AverageAggregator, _CountAggregator, _MaxAggregator, \
_MinAggregator, _SumAggregator


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import json
from six.moves import xrange
from azure.cosmos.errors import HTTPFailure
from azure.cosmos.execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos.execution_context.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos.execution_context.query_execution_info import _PartitionedQueryExecutionInfo
from azure.cosmos.execution_context import endpoint_component
from azure.cosmos.execution_context import multi_execution_aggregator
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos._execution_context.query_execution_info import _PartitionedQueryExecutionInfo
from azure.cosmos._execution_context import endpoint_component
from azure.cosmos._execution_context import multi_execution_aggregator
from azure.cosmos.http_constants import StatusCodes, SubStatusCodes

class _ProxyQueryExecutionContext(_QueryExecutionContextBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"""

import heapq
from azure.cosmos.execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos.execution_context import document_producer
from azure.cosmos.routing import routing_range
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range

class _MultiExecutionContextAggregator(_QueryExecutionContextBase):
"""This class is capable of queries which requires rewriting based on
Expand Down Expand Up @@ -147,4 +147,4 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
def _get_target_parition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
return self._routing_provider.get_overlapping_ranges(self._resource_link, [routing_range._Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges])
return self._routing_provider.get_overlapping_ranges(self._resource_link, [routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges])
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

from six.moves.urllib.parse import urlparse
import threading
from . import constants
from . import _constants as constants
from . import errors
from .location_cache import LocationCache
from ._location_cache import LocationCache

class _GlobalEndpointManager(object):
"""
Expand Down
Loading

0 comments on commit c6ebc93

Please sign in to comment.