Skip to content

Commit

Permalink
Update accessibility of class (#7091)
Browse files Browse the repository at this point in the history
* Fix pylint

* Update accessibility of of class

* Small fix in livetest

* Wait longer in iothub livetest

* Small updates in livetest
  • Loading branch information
yunhaoling authored Sep 6, 2019
1 parent 998eeed commit e13ddee
Show file tree
Hide file tree
Showing 17 changed files with 332 additions and 335 deletions.
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

def __enter__(self):
return self
Expand All @@ -26,15 +26,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self):
Expand All @@ -45,36 +45,36 @@ def _open(self):
"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
self._handler.close()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection( # pylint: disable=protected-access
self.client.address.hostname,
self.client.get_auth(**alt_creds)
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)

Expand All @@ -89,19 +89,18 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return operation()
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception)
raise last_exception

def close(self, exception=None):
Expand All @@ -124,16 +123,16 @@ def close(self, exception=None):
:caption: Close down the handler.
"""
self.running = False
if self.error: # type: ignore
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
class ConsumerProducerMixin(object):

def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

async def __aenter__(self):
return self
Expand All @@ -27,15 +27,15 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
await self._close_connection()

async def _open(self):
Expand All @@ -46,36 +46,36 @@ async def _open(self):
"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
await self._handler.close_async()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
await self._handler.open_async(connection=await self._client._conn_manager.get_connection(
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access
await self._client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, self)

Expand All @@ -90,19 +90,18 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
while retried_times <= self._client._config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return await operation()
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
await self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception)
raise last_exception

async def close(self, exception=None):
Expand All @@ -125,18 +124,18 @@ async def close(self, exception=None):
:caption: Close down the handler.
"""
self.running = False
if self.error: #type: ignore
self._running = False
if self._error: #type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
self._error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
self._error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
44 changes: 22 additions & 22 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,46 +68,46 @@ def _create_auth(self, username=None, password=None):
:param password: The shared access key.
:type password: str
"""
http_proxy = self.config.http_proxy
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout
http_proxy = self._config.http_proxy
transport_type = self._config.transport_type
auth_timeout = self._config.auth_timeout

if isinstance(self.credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.host, username, password, http_proxy=http_proxy, transport_type=transport_type)
self._host, username, password, http_proxy=http_proxy, transport_type=transport_type)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
elif isinstance(self._credential, EventHubSASTokenCredential):
token = self._credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
self._auth_uri, self._auth_uri, token,
expires_at=expiry,
timeout=auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type)

else:
get_jwt_token = functools.partial(self.credential.get_token, 'https://eventhubs.azure.net//.default')
return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri,
get_jwt_token = functools.partial(self._credential.get_token, 'https://eventhubs.azure.net//.default')
return authentication.JWTTokenAsync(self._auth_uri, self._auth_uri,
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None):
entity_name = entity_name or self.container_id
backoff = self.config.backoff_factor * 2 ** retried_times
if backoff <= self.config.backoff_max and (
entity_name = entity_name or self._container_id
backoff = self._config.backoff_factor * 2 ** retried_times
if backoff <= self._config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception)
Expand All @@ -123,11 +123,11 @@ async def _management_request(self, mgmt_msg, op_type):
}

retried_times = 0
while retried_times <= self.config.max_retries:
while retried_times <= self._config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing)
try:
conn = await self._conn_manager.get_connection(self.host, mgmt_auth)
conn = await self._conn_manager.get_connection(self._host, mgmt_auth)
await mgmt_client.open_async(connection=conn)
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
Expand Down Expand Up @@ -265,12 +265,12 @@ def create_consumer(
"""
owner_level = kwargs.get("owner_level")
operation = kwargs.get("operation")
prefetch = kwargs.get("prefetch") or self.config.prefetch
prefetch = kwargs.get("prefetch") or self._config.prefetch
loop = kwargs.get("loop")

path = self.address.path + operation if operation else self.address.path
path = self._address.path + operation if operation else self._address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition_id)
self._address.hostname, path, consumer_group, partition_id)
handler = EventHubConsumer(
self, source_url, event_position=event_position, owner_level=owner_level,
prefetch=prefetch, loop=loop)
Expand Down Expand Up @@ -309,10 +309,10 @@ def create_producer(
"""

target = "amqps://{}{}".format(self.address.hostname, self.address.path)
target = "amqps://{}{}".format(self._address.hostname, self._address.path)
if operation:
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout
send_timeout = self._config.send_timeout if send_timeout is None else send_timeout

handler = EventHubProducer(
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
Expand Down
Loading

0 comments on commit e13ddee

Please sign in to comment.