Skip to content

Commit

Permalink
Merge pull request Azure#55 from annatisch/eh_scenarios
Browse files Browse the repository at this point in the history
New send settings + auth timeout
  • Loading branch information
annatisch authored Aug 23, 2018
2 parents 97137ac + c8db793 commit 1433553
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 147 deletions.
12 changes: 12 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
Release History
===============

1.0.0 (2018-08-22)
++++++++++++++++++

- API stable.
- Renamed internal `_async` module to `async_ops` for docs generation.
- Added optional `auth_timeout` parameter to `EventHubClient` and `EventHubClientAsync` to configure how long to allow for token
negotiation to complete. Default is 60 seconds.
- Added optional `send_timeout` parameter to `EventHubClient.add_sender` and `EventHubClientAsync.add_async_sender` to determine the
timeout for Events to be successfully sent. Default value is 60 seconds.
- Reformatted logging for performance.


0.2.0 (2018-08-06)
++++++++++++++++++

Expand Down
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ Python 2.7 support
The uAMQP library currently only supports Python 3.4 and above. Python 2.7 support is planned for a future release.


Documentation
+++++++++++++
Reference documentation is available at `docs.microsoft.com/python/api/azure-eventhub <https://docs.microsoft.com/python/api/azure-eventhub>`__.


Examples
+++++++++

Expand Down
4 changes: 2 additions & 2 deletions azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "0.2.0"
__version__ = "1.0.0"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver

try:
from azure.eventhub._async import (
from azure.eventhub.async_ops import (
EventHubClientAsync,
AsyncSender,
AsyncReceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy)
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)

async def _close_clients_async(self):
"""
Expand All @@ -77,8 +77,9 @@ async def _start_client_async(self, client):
try:
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
log.info("Encountered error while starting handler: {}".format(exp))
log.info("Encountered error while starting handler: %r", exp)
await client.close_async(exception=exp)
log.info("Finished closing failed handler")

async def _handle_redirect(self, redirects):
if len(redirects) != len(self.clients):
Expand All @@ -104,17 +105,17 @@ async def run_async(self):
:rtype: list[~azure.eventhub.common.EventHubError]
"""
log.info("{}: Starting {} clients".format(self.container_id, len(self.clients)))
log.info("%r: Starting %r clients", self.container_id, len(self.clients))
tasks = [self._start_client_async(c) for c in self.clients]
try:
await asyncio.gather(*tasks)
redirects = [c.redirected for c in self.clients if c.redirected]
failed = [c.error for c in self.clients if c.error]
if failed and len(failed) == len(self.clients):
log.warning("{}: All clients failed to start.".format(self.container_id))
log.warning("%r: All clients failed to start.", self.container_id)
raise failed[0]
elif failed:
log.warning("{}: {} clients failed to start.".format(self.container_id, len(failed)))
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
elif redirects:
await self._handle_redirect(redirects)
except EventHubError:
Expand All @@ -129,7 +130,7 @@ async def stop_async(self):
"""
Stop the EventHubClient and all its Sender/Receiver clients.
"""
log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients)))
log.info("%r: Stopping %r clients", self.container_id, len(self.clients))
self.stopped = True
await self._close_clients_async()

Expand Down Expand Up @@ -182,7 +183,7 @@ def add_async_receiver(
:operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync
:rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync
"""
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
Expand Down Expand Up @@ -213,7 +214,7 @@ def add_async_epoch_receiver(
:operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync
:rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync
"""
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
Expand All @@ -224,7 +225,9 @@ def add_async_epoch_receiver(
self.clients.append(handler)
return handler

def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True, loop=None):
def add_async_sender(
self, partition=None, operation=None, send_timeout=60,
keep_alive=30, auto_reconnect=True, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
Expand All @@ -236,13 +239,23 @@ def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_r
:operation: An optional operation to be appended to the hostname in the target URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub._async.sender_async.SenderAsync
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not
be pinged.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
:rtype: ~azure.eventhub.async_ops.sender_async.SenderAsync
"""
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
target = target + operation
handler = AsyncSender(
self, target, partition=partition, keep_alive=keep_alive,
self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive,
auto_reconnect=auto_reconnect, loop=loop)
self.clients.append(handler)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called
Instantiate an async receiver.
:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub._async.EventHubClientAsync
:type client: ~azure.eventhub.async_ops.EventHubClientAsync
:param source: The source EventHub from which to receive events.
:type source: ~uamqp.address.Source
:param prefetch: The number of events to prefetch from the service
Expand Down Expand Up @@ -78,7 +78,7 @@ async def open_async(self):
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp._async.connection_async.ConnectionAsync
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
"""
# pylint: disable=protected-access
if self.redirected:
Expand Down Expand Up @@ -128,9 +128,33 @@ async def reconnect_async(self):
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
try:
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error

async def has_started(self):
"""
Expand Down Expand Up @@ -224,7 +248,7 @@ async def receive(self, max_batch_size=None, timeout=None):
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred ({}). Shutting down.".format(e))
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receive failed: {}".format(e))
await self.close_async(exception=error)
raise error
Loading

0 comments on commit 1433553

Please sign in to comment.