diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 040d00c947d8..dfc198f71fa8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "5.0.0b2" +__version__ = "5.0.0b3" from uamqp import constants # type: ignore from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 282a8c574088..ef32cb7a591c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -15,9 +15,9 @@ class ConsumerProducerMixin(object): def __init__(self): - self.client = None + self._client = None self._handler = None - self.name = None + self._name = None def __enter__(self): return self @@ -26,15 +26,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close(exc_val) def _check_closed(self): - if self.error: - raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) + if self._error: + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass def _redirect(self, redirect): - self.redirected = redirect - self.running = False + self._redirected = redirect + self._running = False self._close_connection() def _open(self): @@ -45,36 +45,36 @@ def _open(self): """ # pylint: disable=protected-access - if not self.running: + if not self._running: if self._handler: self._handler.close() - if self.redirected: + if self._redirected: alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password": self.client._auth_config.get("iot_password")} + "username": self._client._auth_config.get("iot_username"), + "password": self._client._auth_config.get("iot_password")} else: alt_creds = {} self._create_handler() - self._handler.open(connection=self.client._conn_manager.get_connection( # pylint: disable=protected-access - self.client.address.hostname, - self.client.get_auth(**alt_creds) + self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access + self._client._address.hostname, + self._client._get_auth(**alt_creds) )) while not self._handler.client_ready(): time.sleep(0.05) self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access - self.running = True + self._running = True def _close_handler(self): self._handler.close() # close the link (sharing connection) or connection (not sharing) - self.running = False + self._running = False def _close_connection(self): self._close_handler() - self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access + self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access def _handle_exception(self, exception): - if not self.running and isinstance(exception, compat.TimeoutException): + if not self._running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") return _handle_exception(exception, self) @@ -89,19 +89,18 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs): last_exception = kwargs.pop('last_exception', None) operation_need_param = kwargs.pop('operation_need_param', True) - while retried_times <= self.client.config.max_retries: + while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access try: if operation_need_param: return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs) - else: - return operation() + return operation() except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) - self.client._try_delay(retried_times=retried_times, last_exception=last_exception, - timeout_time=timeout_time, entity_name=self.name) + self._client._try_delay(retried_times=retried_times, last_exception=last_exception, + timeout_time=timeout_time, entity_name=self._name) retried_times += 1 - log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception) + log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception) raise last_exception def close(self, exception=None): @@ -124,16 +123,16 @@ def close(self, exception=None): :caption: Close down the handler. """ - self.running = False - if self.error: # type: ignore + self._running = False + if self._error: # type: ignore return if isinstance(exception, errors.LinkRedirect): - self.redirected = exception + self._redirected = exception elif isinstance(exception, EventHubError): - self.error = exception + self._error = exception elif exception: - self.error = EventHubError(str(exception)) + self._error = EventHubError(str(exception)) else: - self.error = EventHubError("{} handler is closed.".format(self.name)) + self._error = EventHubError("{} handler is closed.".format(self._name)) if self._handler: self._handler.close() # this will close link if sharing connection. Otherwise close connection diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 64873f843dc4..33c944d41be4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -16,9 +16,9 @@ class ConsumerProducerMixin(object): def __init__(self): - self.client = None + self._client = None self._handler = None - self.name = None + self._name = None async def __aenter__(self): return self @@ -27,15 +27,15 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close(exc_val) def _check_closed(self): - if self.error: - raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) + if self._error: + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass async def _redirect(self, redirect): - self.redirected = redirect - self.running = False + self._redirected = redirect + self._running = False await self._close_connection() async def _open(self): @@ -46,36 +46,36 @@ async def _open(self): """ # pylint: disable=protected-access - if not self.running: + if not self._running: if self._handler: await self._handler.close_async() - if self.redirected: + if self._redirected: alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password": self.client._auth_config.get("iot_password")} + "username": self._client._auth_config.get("iot_username"), + "password": self._client._auth_config.get("iot_password")} else: alt_creds = {} self._create_handler() - await self._handler.open_async(connection=await self.client._conn_manager.get_connection( - self.client.address.hostname, - self.client.get_auth(**alt_creds) + await self._handler.open_async(connection=await self._client._conn_manager.get_connection( + self._client._address.hostname, + self._client._get_auth(**alt_creds) )) while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access - self.running = True + self._running = True async def _close_handler(self): await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) - self.running = False + self._running = False async def _close_connection(self): await self._close_handler() - await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access + await self._client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access async def _handle_exception(self, exception): - if not self.running and isinstance(exception, compat.TimeoutException): + if not self._running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") return await _handle_exception(exception, self) @@ -90,19 +90,18 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs): last_exception = kwargs.pop('last_exception', None) operation_need_param = kwargs.pop('operation_need_param', True) - while retried_times <= self.client.config.max_retries: + while retried_times <= self._client._config.max_retries: try: if operation_need_param: return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs) - else: - return await operation() + return await operation() except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) - await self.client._try_delay(retried_times=retried_times, last_exception=last_exception, - timeout_time=timeout_time, entity_name=self.name) + await self._client._try_delay(retried_times=retried_times, last_exception=last_exception, + timeout_time=timeout_time, entity_name=self._name) retried_times += 1 - log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception) + log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception) raise last_exception async def close(self, exception=None): @@ -125,18 +124,18 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self.running = False - if self.error: #type: ignore + self._running = False + if self._error: #type: ignore return if isinstance(exception, errors.LinkRedirect): - self.redirected = exception + self._redirected = exception elif isinstance(exception, EventHubError): - self.error = exception + self._error = exception elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = ConnectError(str(exception), exception) + self._error = ConnectError(str(exception), exception) elif exception: - self.error = EventHubError(str(exception)) + self._error = EventHubError(str(exception)) else: - self.error = EventHubError("This receive handler is now closed.") + self._error = EventHubError("This receive handler is now closed.") if self._handler: await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 23aefaf2aa3e..67f6ab52dd30 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -68,36 +68,36 @@ def _create_auth(self, username=None, password=None): :param password: The shared access key. :type password: str """ - http_proxy = self.config.http_proxy - transport_type = self.config.transport_type - auth_timeout = self.config.auth_timeout + http_proxy = self._config.http_proxy + transport_type = self._config.transport_type + auth_timeout = self._config.auth_timeout - if isinstance(self.credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return + if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return username = username or self._auth_config['username'] password = password or self._auth_config['password'] if "@sas.root" in username: return authentication.SASLPlain( - self.host, username, password, http_proxy=http_proxy, transport_type=transport_type) + self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) return authentication.SASTokenAsync.from_shared_access_key( - self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, + self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) - elif isinstance(self.credential, EventHubSASTokenCredential): - token = self.credential.get_sas_token() + elif isinstance(self._credential, EventHubSASTokenCredential): + token = self._credential.get_sas_token() try: expiry = int(parse_sas_token(token)['se']) except (KeyError, TypeError, IndexError): raise ValueError("Supplied SAS token has no valid expiry value.") return authentication.SASTokenAsync( - self.auth_uri, self.auth_uri, token, + self._auth_uri, self._auth_uri, token, expires_at=expiry, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) else: - get_jwt_token = functools.partial(self.credential.get_token, 'https://eventhubs.azure.net//.default') - return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri, + get_jwt_token = functools.partial(self._credential.get_token, 'https://eventhubs.azure.net//.default') + return authentication.JWTTokenAsync(self._auth_uri, self._auth_uri, get_jwt_token, http_proxy=http_proxy, transport_type=transport_type) @@ -105,9 +105,9 @@ async def _close_connection(self): await self._conn_manager.reset_connection_if_broken() async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None): - entity_name = entity_name or self.container_id - backoff = self.config.backoff_factor * 2 ** retried_times - if backoff <= self.config.backoff_max and ( + entity_name = entity_name or self._container_id + backoff = self._config.backoff_factor * 2 ** retried_times + if backoff <= self._config.backoff_max and ( timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return await asyncio.sleep(backoff) log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception) @@ -123,11 +123,11 @@ async def _management_request(self, mgmt_msg, op_type): } retried_times = 0 - while retried_times <= self.config.max_retries: + while retried_times <= self._config.max_retries: mgmt_auth = self._create_auth(**alt_creds) - mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing) + mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) try: - conn = await self._conn_manager.get_connection(self.host, mgmt_auth) + conn = await self._conn_manager.get_connection(self._host, mgmt_auth) await mgmt_client.open_async(connection=conn) response = await mgmt_client.mgmt_request_async( mgmt_msg, @@ -265,12 +265,12 @@ def create_consumer( """ owner_level = kwargs.get("owner_level") operation = kwargs.get("operation") - prefetch = kwargs.get("prefetch") or self.config.prefetch + prefetch = kwargs.get("prefetch") or self._config.prefetch loop = kwargs.get("loop") - path = self.address.path + operation if operation else self.address.path + path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self.address.hostname, path, consumer_group, partition_id) + self._address.hostname, path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch, loop=loop) @@ -309,10 +309,10 @@ def create_producer( """ - target = "amqps://{}{}".format(self.address.hostname, self.address.path) + target = "amqps://{}{}".format(self._address.hostname, self._address.path) if operation: target = target + operation - send_timeout = self.config.send_timeout if send_timeout is None else send_timeout + send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( self, target, partition=partition_id, send_timeout=send_timeout, loop=loop) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 147550c4d819..d3651a1c9d8f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -32,9 +32,9 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan sometimes referred to as "Non-Epoch Consumers." """ - timeout = 0 - _epoch = b'com.microsoft:epoch' - _timeout = b'com.microsoft:timeout' + _timeout = 0 + _epoch_symbol = b'com.microsoft:epoch' + _timeout_symbol = b'com.microsoft:timeout' def __init__( # pylint: disable=super-init-not-called self, client, source, **kwargs): @@ -64,28 +64,28 @@ def __init__( # pylint: disable=super-init-not-called loop = kwargs.get("loop", None) super(EventHubConsumer, self).__init__() - self.loop = loop or asyncio.get_event_loop() - self.running = False - self.client = client - self.source = source - self.offset = event_position - self.messages_iter = None - self.prefetch = prefetch - self.owner_level = owner_level - self.keep_alive = keep_alive - self.auto_reconnect = auto_reconnect - self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) - self.reconnect_backoff = 1 - self.redirected = None - self.error = None + self._loop = loop or asyncio.get_event_loop() + self._running = False + self._client = client + self._source = source + self._offset = event_position + self._messages_iter = None + self._prefetch = prefetch + self._owner_level = owner_level + self._keep_alive = keep_alive + self._auto_reconnect = auto_reconnect + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access + self._reconnect_backoff = 1 + self._redirected = None + self._error = None self._link_properties = {} - partition = self.source.split('/')[-1] - self.partition = partition - self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) + partition = self._source.split('/')[-1] + self._partition = partition + self._name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: - self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) - link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000 - self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms)) + self._link_properties[types.AMQPSymbol(self._epoch_symbol)] = types.AMQPLong(int(owner_level)) + link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access + self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None def __aiter__(self): @@ -93,48 +93,48 @@ def __aiter__(self): async def __anext__(self): retried_times = 0 - while retried_times < self.client.config.max_retries: + while retried_times < self._client._config.max_retries: # pylint:disable=protected-access try: await self._open() - if not self.messages_iter: - self.messages_iter = self._handler.receive_messages_iter_async() - message = await self.messages_iter.__anext__() + if not self._messages_iter: + self._messages_iter = self._handler.receive_messages_iter_async() + message = await self._messages_iter.__anext__() event_data = EventData._from_message(message) # pylint:disable=protected-access - self.offset = EventPosition(event_data.offset, inclusive=False) + self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) - await self.client._try_delay(retried_times=retried_times, last_exception=last_exception, - entity_name=self.name) + await self._client._try_delay(retried_times=retried_times, last_exception=last_exception, # pylint:disable=protected-access + entity_name=self._name) retried_times += 1 def _create_handler(self): alt_creds = { - "username": self.client._auth_config.get("iot_username") if self.redirected else None, # pylint:disable=protected-access - "password": self.client._auth_config.get("iot_password") if self.redirected else None # pylint:disable=protected-access + "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access + "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access } - source = Source(self.source) - if self.offset is not None: - source.set_filter(self.offset._selector()) # pylint:disable=protected-access + source = Source(self._source) + if self._offset is not None: + source.set_filter(self._offset._selector()) # pylint:disable=protected-access self._handler = ReceiveClientAsync( source, - auth=self.client.get_auth(**alt_creds), - debug=self.client.config.network_tracing, - prefetch=self.prefetch, + auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + debug=self._client._config.network_tracing, # pylint:disable=protected-access + prefetch=self._prefetch, link_properties=self._link_properties, - timeout=self.timeout, - error_policy=self.retry_policy, - keep_alive_interval=self.keep_alive, - client_name=self.name, - properties=self.client._create_properties( # pylint:disable=protected-access - self.client.config.user_agent), - loop=self.loop) - self.messages_iter = None + timeout=self._timeout, + error_policy=self._retry_policy, + keep_alive_interval=self._keep_alive, + client_name=self._name, + properties=self._client._create_properties( # pylint:disable=protected-access + self._client._config.user_agent), # pylint:disable=protected-access + loop=self._loop) + self._messages_iter = None async def _redirect(self, redirect): - self.messages_iter = None + self._messages_iter = None await super(EventHubConsumer, self)._redirect(redirect) async def _open(self): @@ -145,11 +145,11 @@ async def _open(self): """ # pylint: disable=protected-access - self.redirected = self.redirected or self.client._iothub_redirect_info + self._redirected = self._redirected or self._client._iothub_redirect_info - if not self.running and self.redirected: - self.client._process_redirect_uri(self.redirected) - self.source = self.redirected.address + if not self._running and self._redirected: + self._client._process_redirect_uri(self._redirected) + self._source = self._redirected.address await super(EventHubConsumer, self)._open() async def _open_with_retry(self): @@ -163,7 +163,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: - log.info("%r receive operation timed out. (%r)", self.name, last_exception) + log.info("%r receive operation timed out. (%r)", self._name, last_exception) raise last_exception return data_batch @@ -173,7 +173,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self.offset = EventPosition(event_data.offset) + self._offset = EventPosition(event_data.offset) data_batch.append(event_data) return data_batch @@ -223,8 +223,8 @@ async def receive(self, *, max_batch_size=None, timeout=None): """ self._check_closed() - timeout = timeout or self.client.config.receive_timeout - max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) + timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access + max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) @@ -248,18 +248,18 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self.running = False - if self.error: + self._running = False + if self._error: return if isinstance(exception, errors.LinkRedirect): - self.redirected = exception + self._redirected = exception elif isinstance(exception, EventHubError): - self.error = exception + self._error = exception elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = ConnectError(str(exception), exception) + self._error = ConnectError(str(exception), exception) elif exception: - self.error = EventHubError(str(exception)) + self._error = EventHubError(str(exception)) else: - self.error = EventHubError("This receive handler is now closed.") + self._error = EventHubError("This receive handler is now closed.") if self._handler: await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index 58ecee91ad1b..ae1cd8084f3d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -3,7 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import asyncio -import time import logging from uamqp import errors, compat # type: ignore @@ -40,9 +39,9 @@ async def _handle_exception(exception, closable): # pylint:disable=too-many-bra if isinstance(exception, asyncio.CancelledError): raise exception try: - name = closable.name + name = closable._name # pylint: disable=protected-access except AttributeError: - name = closable.container_id + name = closable._container_id # pylint: disable=protected-access if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise log.info("%r stops due to keyboard interrupt", name) await closable.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index f9fb32420e81..999bdc09c787 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -27,7 +27,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint: disable=too-many-insta to a partition. """ - _timeout = b'com.microsoft:timeout' + _timeout_symbol = b'com.microsoft:timeout' def __init__( # pylint: disable=super-init-not-called self, client, target, **kwargs): @@ -60,42 +60,42 @@ def __init__( # pylint: disable=super-init-not-called loop = kwargs.get("loop", None) super(EventHubProducer, self).__init__() - self.loop = loop or asyncio.get_event_loop() + self._loop = loop or asyncio.get_event_loop() self._max_message_size_on_link = None - self.running = False - self.client = client - self.target = target - self.partition = partition - self.keep_alive = keep_alive - self.auto_reconnect = auto_reconnect - self.timeout = send_timeout - self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) - self.reconnect_backoff = 1 - self.name = "EHProducer-{}".format(uuid.uuid4()) - self.unsent_events = None - self.redirected = None - self.error = None + self._running = False + self._client = client + self._target = target + self._partition = partition + self._keep_alive = keep_alive + self._auto_reconnect = auto_reconnect + self._timeout = send_timeout + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access + self._reconnect_backoff = 1 + self._name = "EHProducer-{}".format(uuid.uuid4()) + self._unsent_events = None + self._redirected = None + self._error = None if partition: - self.target += "/Partitions/" + partition - self.name += "-partition{}".format(partition) + self._target += "/Partitions/" + partition + self._name += "-partition{}".format(partition) self._handler = None self._outcome = None self._condition = None - self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))} + self._link_properties = {types.AMQPSymbol(self._timeout_symbol): types.AMQPLong(int(self._timeout * 1000))} def _create_handler(self): self._handler = SendClientAsync( - self.target, - auth=self.client.get_auth(), - debug=self.client.config.network_tracing, - msg_timeout=self.timeout, - error_policy=self.retry_policy, - keep_alive_interval=self.keep_alive, - client_name=self.name, + self._target, + auth=self._client._get_auth(), # pylint:disable=protected-access + debug=self._client._config.network_tracing, # pylint:disable=protected-access + msg_timeout=self._timeout, + error_policy=self._retry_policy, + keep_alive_interval=self._keep_alive, + client_name=self._name, link_properties=self._link_properties, - properties=self.client._create_properties( # pylint: disable=protected-access - self.client.config.user_agent), - loop=self.loop) + properties=self._client._create_properties( # pylint: disable=protected-access + self._client._config.user_agent), # pylint:disable=protected-access + loop=self._loop) async def _open(self): """ @@ -104,16 +104,16 @@ async def _open(self): context will be used to create a new handler before opening it. """ - if not self.running and self.redirected: - self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access - self.target = self.redirected.address + if not self._running and self._redirected: + self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access + self._target = self._redirected.address await super(EventHubProducer, self)._open() async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) async def _send_event_data(self, timeout_time=None, last_exception=None): - if self.unsent_events: + if self._unsent_events: await self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: @@ -121,12 +121,12 @@ async def _send_event_data(self, timeout_time=None, last_exception=None): error = last_exception else: error = OperationTimeoutError("send operation timed out") - log.info("%r send operation timed out. (%r)", self.name, error) + log.info("%r send operation timed out. (%r)", self._name, error) raise error self._handler._msg_timeout = remaining_time # pylint: disable=protected-access - self._handler.queue_message(*self.unsent_events) + self._handler.queue_message(*self._unsent_events) await self._handler.wait_async() - self.unsent_events = self._handler.pending_messages + self._unsent_events = self._handler.pending_messages if self._outcome != constants.MessageSendResult.Ok: if self._outcome == constants.MessageSendResult.Timeout: self._condition = OperationTimeoutError("send operation timed out") @@ -228,7 +228,7 @@ async def send( event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome - self.unsent_events = [wrapper_event_data.message] + self._unsent_events = [wrapper_event_data.message] await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor async def close(self, exception=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 347b1263be2c..90a1ac86742f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -68,38 +68,38 @@ def _create_auth(self, username=None, password=None): :param password: The shared access key. :type password: str """ - http_proxy = self.config.http_proxy - transport_type = self.config.transport_type - auth_timeout = self.config.auth_timeout + http_proxy = self._config.http_proxy + transport_type = self._config.transport_type + auth_timeout = self._config.auth_timeout # TODO: the following code can be refactored to create auth from classes directly instead of using if-else - if isinstance(self.credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return + if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return username = username or self._auth_config['username'] password = password or self._auth_config['password'] if "@sas.root" in username: return authentication.SASLPlain( - self.host, username, password, http_proxy=http_proxy, transport_type=transport_type) + self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) return authentication.SASTokenAuth.from_shared_access_key( - self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, + self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) - elif isinstance(self.credential, EventHubSASTokenCredential): - token = self.credential.get_sas_token() + elif isinstance(self._credential, EventHubSASTokenCredential): + token = self._credential.get_sas_token() try: expiry = int(parse_sas_token(token)['se']) except (KeyError, TypeError, IndexError): raise ValueError("Supplied SAS token has no valid expiry value.") return authentication.SASTokenAuth( - self.auth_uri, self.auth_uri, token, + self._auth_uri, self._auth_uri, token, expires_at=expiry, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) else: # Azure credential - get_jwt_token = functools.partial(self.credential.get_token, + get_jwt_token = functools.partial(self._credential.get_token, 'https://eventhubs.azure.net//.default') - return authentication.JWTTokenAuth(self.auth_uri, self.auth_uri, + return authentication.JWTTokenAuth(self._auth_uri, self._auth_uri, get_jwt_token, http_proxy=http_proxy, transport_type=transport_type) @@ -107,9 +107,9 @@ def _close_connection(self): self._conn_manager.reset_connection_if_broken() def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None): - entity_name = entity_name or self.container_id - backoff = self.config.backoff_factor * 2 ** retried_times - if backoff <= self.config.backoff_max and ( + entity_name = entity_name or self._container_id + backoff = self._config.backoff_factor * 2 ** retried_times + if backoff <= self._config.backoff_max and ( timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return time.sleep(backoff) log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception) @@ -125,11 +125,11 @@ def _management_request(self, mgmt_msg, op_type): } retried_times = 0 - while retried_times <= self.config.max_retries: + while retried_times <= self._config.max_retries: mgmt_auth = self._create_auth(**alt_creds) - mgmt_client = uamqp.AMQPClient(self.mgmt_target) + mgmt_client = uamqp.AMQPClient(self._mgmt_target) try: - conn = self._conn_manager.get_connection(self.host, mgmt_auth) #pylint:disable=assignment-from-none + conn = self._conn_manager.get_connection(self._host, mgmt_auth) #pylint:disable=assignment-from-none mgmt_client.open(connection=conn) response = mgmt_client.mgmt_request( mgmt_msg, @@ -262,11 +262,11 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs """ owner_level = kwargs.get("owner_level") operation = kwargs.get("operation") - prefetch = kwargs.get("prefetch") or self.config.prefetch + prefetch = kwargs.get("prefetch") or self._config.prefetch - path = self.address.path + operation if operation else self.address.path + path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self.address.hostname, path, consumer_group, partition_id) + self._address.hostname, path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch) @@ -299,10 +299,10 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): """ - target = "amqps://{}{}".format(self.address.hostname, self.address.path) + target = "amqps://{}{}".format(self._address.hostname, self._address.path) if operation: target = target + operation - send_timeout = self.config.send_timeout if send_timeout is None else send_timeout + send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( self, target, partition=partition_id, send_timeout=send_timeout) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 8b6712ce207e..7d4c8cd2712e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -133,31 +133,32 @@ def __init__(self, host, event_hub_path, credential, **kwargs): queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float """ - self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] - self.address = _Address() - self.address.hostname = host - self.address.path = "/" + event_hub_path if event_hub_path else "" + self.eh_name = event_hub_path + self._host = host + self._container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] + self._address = _Address() + self._address.hostname = host + self._address.path = "/" + event_hub_path if event_hub_path else "" self._auth_config = {} # type:Dict[str,str] - self.credential = credential + self._credential = credential if isinstance(credential, EventHubSharedKeyCredential): - self.username = credential.policy - self.password = credential.key - self._auth_config['username'] = self.username - self._auth_config['password'] = self.password + self._username = credential.policy + self._password = credential.key + self._auth_config['username'] = self._username + self._auth_config['password'] = self._password - self.host = host - self.eh_name = event_hub_path - self.keep_alive = kwargs.get("keep_alive", 30) - self.auto_reconnect = kwargs.get("auto_reconnect", True) - self.mgmt_target = "amqps://{}/{}".format(self.host, self.eh_name) - self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self.get_auth = functools.partial(self._create_auth) - self.config = _Configuration(**kwargs) - self.debug = self.config.network_tracing + self._keep_alive = kwargs.get("keep_alive", 30) + self._auto_reconnect = kwargs.get("auto_reconnect", True) + self._mgmt_target = "amqps://{}/{}".format(self._host, self.eh_name) + self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) + self._get_auth = functools.partial(self._create_auth) + self._config = _Configuration(**kwargs) + self._debug = self._config.network_tracing self._is_iothub = False self._iothub_redirect_info = None + self._redirect_consumer = None - log.info("%r: Created the Event Hub client", self.container_id) + log.info("%r: Created the Event Hub client", self._container_id) @classmethod def _from_iothub_connection_string(cls, conn_str, **kwargs): @@ -176,8 +177,8 @@ def _from_iothub_connection_string(cls, conn_str, **kwargs): 'iot_password': key, 'username': username, 'password': password} - client._is_iothub = True - client._redirect_consumer = client.create_consumer(consumer_group='$default', + client._is_iothub = True # pylint: disable=protected-access + client._redirect_consumer = client.create_consumer(consumer_group='$default', # pylint: disable=protected-access, no-member partition_id='0', event_position=EventPosition('-1'), operation='/messages/events') @@ -216,11 +217,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") - self.address = urlparse(auth_uri) - self.host = self.address.hostname - self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self.eh_name = self.address.path.lstrip('/') - self.mgmt_target = redirect_uri + self._address = urlparse(auth_uri) + self._host = self._address.hostname + self.eh_name = self._address.path.lstrip('/') + self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) + self._mgmt_target = redirect_uri if self._is_iothub: self._iothub_redirect_info = redirect diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index e10f52e61b59..0e89ba9bc55f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -34,9 +34,9 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan sometimes referred to as "Non-Epoch Consumers." """ - timeout = 0 - _epoch = b'com.microsoft:epoch' - _timeout = b'com.microsoft:timeout' + _timeout = 0 + _epoch_symbol = b'com.microsoft:epoch' + _timeout_symbol = b'com.microsoft:timeout' def __init__(self, client, source, **kwargs): """ @@ -61,27 +61,27 @@ def __init__(self, client, source, **kwargs): auto_reconnect = kwargs.get("auto_reconnect", True) super(EventHubConsumer, self).__init__() - self.running = False - self.client = client - self.source = source - self.offset = event_position - self.messages_iter = None - self.prefetch = prefetch - self.owner_level = owner_level - self.keep_alive = keep_alive - self.auto_reconnect = auto_reconnect - self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) - self.reconnect_backoff = 1 + self._running = False + self._client = client + self._source = source + self._offset = event_position + self._messages_iter = None + self._prefetch = prefetch + self._owner_level = owner_level + self._keep_alive = keep_alive + self._auto_reconnect = auto_reconnect + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access + self._reconnect_backoff = 1 self._link_properties = {} - self.redirected = None - self.error = None - partition = self.source.split('/')[-1] - self.partition = partition - self.name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition) + self._redirected = None + self._error = None + partition = self._source.split('/')[-1] + self._partition = partition + self._name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: - self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) - link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000 - self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms)) + self._link_properties[types.AMQPSymbol(self._epoch_symbol)] = types.AMQPLong(int(owner_level)) + link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access + self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None def __iter__(self): @@ -89,47 +89,47 @@ def __iter__(self): def __next__(self): retried_times = 0 - while retried_times < self.client.config.max_retries: + while retried_times < self._client._config.max_retries: # pylint:disable=protected-access try: self._open() - if not self.messages_iter: - self.messages_iter = self._handler.receive_messages_iter() - message = next(self.messages_iter) + if not self._messages_iter: + self._messages_iter = self._handler.receive_messages_iter() + message = next(self._messages_iter) event_data = EventData._from_message(message) # pylint:disable=protected-access - self.offset = EventPosition(event_data.offset, inclusive=False) + self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) - self.client._try_delay(retried_times=retried_times, last_exception=last_exception, - entity_name=self.name) + self._client._try_delay(retried_times=retried_times, last_exception=last_exception, # pylint:disable=protected-access + entity_name=self._name) retried_times += 1 def _create_handler(self): alt_creds = { - "username": self.client._auth_config.get("iot_username") if self.redirected else None, # pylint:disable=protected-access - "password": self.client._auth_config.get("iot_password") if self.redirected else None # pylint:disable=protected-access + "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access + "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access } - source = Source(self.source) - if self.offset is not None: - source.set_filter(self.offset._selector()) # pylint:disable=protected-access + source = Source(self._source) + if self._offset is not None: + source.set_filter(self._offset._selector()) # pylint:disable=protected-access self._handler = ReceiveClient( source, - auth=self.client.get_auth(**alt_creds), - debug=self.client.config.network_tracing, - prefetch=self.prefetch, + auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + debug=self._client._config.network_tracing, # pylint:disable=protected-access + prefetch=self._prefetch, link_properties=self._link_properties, - timeout=self.timeout, - error_policy=self.retry_policy, - keep_alive_interval=self.keep_alive, - client_name=self.name, - properties=self.client._create_properties( # pylint:disable=protected-access - self.client.config.user_agent)) - self.messages_iter = None + timeout=self._timeout, + error_policy=self._retry_policy, + keep_alive_interval=self._keep_alive, + client_name=self._name, + properties=self._client._create_properties( # pylint:disable=protected-access + self._client._config.user_agent)) # pylint:disable=protected-access + self._messages_iter = None def _redirect(self, redirect): - self.messages_iter = None + self._messages_iter = None super(EventHubConsumer, self)._redirect(redirect) def _open(self): @@ -140,11 +140,11 @@ def _open(self): """ # pylint: disable=protected-access - self.redirected = self.redirected or self.client._iothub_redirect_info + self._redirected = self._redirected or self._client._iothub_redirect_info - if not self.running and self.redirected: - self.client._process_redirect_uri(self.redirected) - self.source = self.redirected.address + if not self._running and self._redirected: + self._client._process_redirect_uri(self._redirected) + self._source = self._redirected.address super(EventHubConsumer, self)._open() def _open_with_retry(self): @@ -158,7 +158,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: - log.info("%r receive operation timed out. (%r)", self.name, last_exception) + log.info("%r receive operation timed out. (%r)", self._name, last_exception) raise last_exception return data_batch remaining_time_ms = 1000 * remaining_time @@ -167,7 +167,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self.offset = EventPosition(event_data.offset) + self._offset = EventPosition(event_data.offset) data_batch.append(event_data) return data_batch @@ -217,8 +217,8 @@ def receive(self, max_batch_size=None, timeout=None): """ self._check_closed() - timeout = timeout or self.client.config.receive_timeout - max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) + timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access + max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) @@ -242,9 +242,9 @@ def close(self, exception=None): :caption: Close down the handler. """ - if self.messages_iter: - self.messages_iter.close() - self.messages_iter = None + if self._messages_iter: + self._messages_iter.close() + self._messages_iter = None super(EventHubConsumer, self).close(exception) next = __next__ # for python2.7 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 6db54e5977de..129cf14a3842 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -2,7 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import time import logging import six @@ -159,9 +158,9 @@ def _create_eventhub_exception(exception): def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements try: # closable is a producer/consumer object - name = closable.name + name = closable._name # pylint: disable=protected-access except AttributeError: # closable is an client object - name = closable.container_id + name = closable._container_id # pylint: disable=protected-access if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise log.info("%r stops due to keyboard interrupt", name) closable.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py index 85020257df46..71741c56dffa 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py @@ -79,8 +79,8 @@ def __init__(self, eventhub_client: EventHubClient, consumer_group_name: str, self._partition_processor_factory = partition_processor_factory self._partition_manager = partition_manager self._initial_event_position = kwargs.get("initial_event_position", "-1") - self._max_batch_size = eventhub_client.config.max_batch_size - self._receive_timeout = eventhub_client.config.receive_timeout + self._max_batch_size = eventhub_client._config.max_batch_size + self._receive_timeout = eventhub_client._config.receive_timeout self._tasks = [] # type: List[asyncio.Task] self._id = str(uuid.uuid4()) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index c019a30ee7b8..8008fac7ecd0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -40,7 +40,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instan to a partition. """ - _timeout = b'com.microsoft:timeout' + _timeout_symbol = b'com.microsoft:timeout' def __init__(self, client, target, **kwargs): """ @@ -71,38 +71,38 @@ def __init__(self, client, target, **kwargs): super(EventHubProducer, self).__init__() self._max_message_size_on_link = None - self.running = False - self.client = client - self.target = target - self.partition = partition - self.timeout = send_timeout - self.redirected = None - self.error = None - self.keep_alive = keep_alive - self.auto_reconnect = auto_reconnect - self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) - self.reconnect_backoff = 1 - self.name = "EHProducer-{}".format(uuid.uuid4()) - self.unsent_events = None + self._running = False + self._client = client + self._target = target + self._partition = partition + self._timeout = send_timeout + self._redirected = None + self._error = None + self._keep_alive = keep_alive + self._auto_reconnect = auto_reconnect + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint: disable=protected-access + self._reconnect_backoff = 1 + self._name = "EHProducer-{}".format(uuid.uuid4()) + self._unsent_events = None if partition: - self.target += "/Partitions/" + partition - self.name += "-partition{}".format(partition) + self._target += "/Partitions/" + partition + self._name += "-partition{}".format(partition) self._handler = None self._outcome = None self._condition = None - self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))} + self._link_properties = {types.AMQPSymbol(self._timeout_symbol): types.AMQPLong(int(self._timeout * 1000))} def _create_handler(self): self._handler = SendClient( - self.target, - auth=self.client.get_auth(), - debug=self.client.config.network_tracing, - msg_timeout=self.timeout, - error_policy=self.retry_policy, - keep_alive_interval=self.keep_alive, - client_name=self.name, + self._target, + auth=self._client._get_auth(), # pylint:disable=protected-access + debug=self._client._config.network_tracing, # pylint:disable=protected-access + msg_timeout=self._timeout, + error_policy=self._retry_policy, + keep_alive_interval=self._keep_alive, + client_name=self._name, link_properties=self._link_properties, - properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access + properties=self._client._create_properties(self._client._config.user_agent)) # pylint: disable=protected-access def _open(self): """ @@ -111,16 +111,16 @@ def _open(self): context will be used to create a new handler before opening it. """ - if not self.running and self.redirected: - self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access - self.target = self.redirected.address + if not self._running and self._redirected: + self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access + self._target = self._redirected.address super(EventHubProducer, self)._open() def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) def _send_event_data(self, timeout_time=None, last_exception=None): - if self.unsent_events: + if self._unsent_events: self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: @@ -128,12 +128,12 @@ def _send_event_data(self, timeout_time=None, last_exception=None): error = last_exception else: error = OperationTimeoutError("send operation timed out") - log.info("%r send operation timed out. (%r)", self.name, error) + log.info("%r send operation timed out. (%r)", self._name, error) raise error self._handler._msg_timeout = remaining_time # pylint: disable=protected-access - self._handler.queue_message(*self.unsent_events) + self._handler.queue_message(*self._unsent_events) self._handler.wait() - self.unsent_events = self._handler.pending_messages + self._unsent_events = self._handler.pending_messages if self._outcome != constants.MessageSendResult.Ok: if self._outcome == constants.MessageSendResult.Timeout: self._condition = OperationTimeoutError("send operation timed out") @@ -233,7 +233,7 @@ def send(self, event_data, partition_key=None, timeout=None): event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome - self.unsent_events = [wrapper_event_data.message] + self._unsent_events = [wrapper_event_data.message] self._send_event_data_with_retry(timeout=timeout) def close(self, exception=None): # pylint:disable=useless-super-delegation diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py index 9226a5d62a93..4ac63eef5d7f 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py @@ -16,7 +16,7 @@ async def pump(receiver, sleep=None): if sleep: await asyncio.sleep(sleep) async with receiver: - batch = await receiver.receive(timeout=3) + batch = await receiver.receive(timeout=10) messages += len(batch) return messages @@ -67,7 +67,7 @@ async def test_iothub_receive_after_mgmt_ops_async(iot_connection_str, device_id assert partitions == ["0", "1", "2", "3"] receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') async with receiver: - received = await receiver.receive(timeout=5) + received = await receiver.receive(timeout=10) assert len(received) == 0 @@ -77,7 +77,7 @@ async def test_iothub_mgmt_ops_after_receive_async(iot_connection_str, device_id client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') async with receiver: - received = await receiver.receive(timeout=5) + received = await receiver.receive(timeout=10) assert len(received) == 0 partitions = await client.get_partition_ids() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py index 900612684001..50ababacf738 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py @@ -72,7 +72,7 @@ async def pump(_pid, receiver, _args, _dl): total, batch[-1].sequence_number, batch[-1].offset)) - print("{}: Total received {}".format(receiver.partition, total)) + print("{}: Total received {}".format(receiver._partition, total)) except Exception as e: print("Partition {} receiver failed: {}".format(_pid, e)) raise diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py index ac7e211bd736..595c822b9cb7 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py @@ -14,7 +14,7 @@ def test_iothub_receive_sync(iot_connection_str, device_id): client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') try: - received = receiver.receive(timeout=5) + received = receiver.receive(timeout=10) assert len(received) == 0 finally: receiver.close() @@ -48,7 +48,7 @@ def test_iothub_receive_after_mgmt_ops_sync(iot_connection_str, device_id): assert partitions == ["0", "1", "2", "3"] receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') with receiver: - received = receiver.receive(timeout=5) + received = receiver.receive(timeout=10) assert len(received) == 0 @@ -57,7 +57,7 @@ def test_iothub_mgmt_ops_after_receive_sync(iot_connection_str, device_id): client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') with receiver: - received = receiver.receive(timeout=5) + received = receiver.receive(timeout=10) assert len(received) == 0 partitions = client.get_partition_ids() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py index 47559b778af3..5a6e42a827e3 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py @@ -62,17 +62,17 @@ def pump(receiver, duration): iteration += 1 if size == 0: print("{}: No events received, queue size {}, delivered {}".format( - receiver.partition, + receiver._partition, receiver.queue_size, total)) elif iteration >= 5: iteration = 0 print("{}: total received {}, last sn={}, last offset={}".format( - receiver.partition, + receiver._partition, total, batch[-1].sequence_number, batch[-1].offset)) - print("{}: Total received {}".format(receiver.partition, total)) + print("{}: Total received {}".format(receiver._partition, total)) except Exception as e: print("EventHubConsumer failed: {}".format(e)) raise