Skip to content

Commit

Permalink
Merge pull request Azure#44 from annatisch/eh_scenarios
Browse files Browse the repository at this point in the history
Stability improvements
  • Loading branch information
annatisch authored Jul 30, 2018
2 parents 91b630c + f02c954 commit dd38016
Show file tree
Hide file tree
Showing 31 changed files with 825 additions and 257 deletions.
13 changes: 13 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
Release History
===============

0.2.0rc2 (2018-07-29)
+++++++++++++++++++++

- **Breaking change** `EventData.offset` will now return an object of type `~uamqp.common.Offset` rather than str.
The original string value can be retrieved from `~uamqp.common.Offset.value`.
- Each sender/receiver will now run in its own independent connection.
- Updated uAMQP dependency to 0.2.0
- Fixed issue with IoTHub clients not being able to retrieve partition information.
- Added support for HTTP proxy settings to both EventHubClient and EPH.
- Added error handling policy to automatically reconnect on retryable error.
- Added keep-alive thread for maintaining an unused connection.


0.2.0rc1 (2018-07-06)
+++++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "0.2.0rc1"
__version__ = "0.2.0rc2"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
72 changes: 26 additions & 46 deletions azure/eventhub/_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import asyncio
import time
import datetime
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus

from uamqp import authentication, constants, types, errors
from uamqp import (
Message,
Source,
ConnectionAsync,
AMQPClientAsync,
SendClientAsync,
Expand All @@ -37,7 +40,7 @@ class EventHubClientAsync(EventHubClient):
sending events to and receiving events from the Azure Event Hubs service.
"""

def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use
def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use
"""
Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate
the session.
Expand All @@ -49,32 +52,13 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self
:param password: The shared access key.
:type password: str
"""
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(self.address.hostname, username, password)
return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password)

def _create_connection_async(self):
"""
Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all
AsyncSender/AsyncReceiver clients.
"""
if not self.connection:
log.info("{}: Creating connection with address={}".format(
self.container_id, self.address.geturl()))
self.connection = ConnectionAsync(
self.address.hostname,
self.auth,
container_id=self.container_id,
properties=self._create_properties(),
debug=self.debug)

async def _close_connection_async(self):
"""
Close and destroy the connection async.
"""
if self.connection:
await self.connection.destroy_async()
self.connection = None
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy)

async def _close_clients_async(self):
"""
Expand All @@ -85,17 +69,13 @@ async def _close_clients_async(self):
async def _wait_for_client(self, client):
try:
while client.get_handler_state().value == 2:
await self.connection.work_async()
await client._handler._connection.work_async() # pylint: disable=protected-access
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

async def _start_client_async(self, client):
try:
await client.open_async(self.connection)
started = await client.has_started()
while not started:
await self.connection.work_async()
started = await client.has_started()
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

Expand All @@ -108,9 +88,8 @@ async def _handle_redirect(self, redirects):
redirects = [c.redirected for c in self.clients if c.redirected]
if not all(r.hostname == redirects[0].hostname for r in redirects):
raise EventHubError("Multiple clients attempting to redirect to different hosts.")
self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config)
await self.connection.redirect_async(redirects[0], self.auth)
await asyncio.gather(*[c.open_async(self.connection) for c in self.clients])
self._process_redirect_uri(redirects[0])
await asyncio.gather(*[c.open_async() for c in self.clients])

async def run_async(self):
"""
Expand All @@ -125,7 +104,6 @@ async def run_async(self):
:rtype: list[~azure.eventhub.common.EventHubError]
"""
log.info("{}: Starting {} clients".format(self.container_id, len(self.clients)))
self._create_connection_async()
tasks = [self._start_client_async(c) for c in self.clients]
try:
await asyncio.gather(*tasks)
Expand Down Expand Up @@ -153,18 +131,21 @@ async def stop_async(self):
log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients)))
self.stopped = True
await self._close_clients_async()
await self._close_connection_async()

async def get_eventhub_info_async(self):
"""
Get details on the specified EventHub async.
:rtype: dict
"""
eh_name = self.address.path.lstrip('/')
target = "amqps://{}/{}".format(self.address.hostname, eh_name)
async with AMQPClientAsync(target, auth=self.auth, debug=self.debug) as mgmt_client:
mgmt_msg = Message(application_properties={'name': eh_name})
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
Expand All @@ -180,6 +161,8 @@ async def get_eventhub_info_async(self):
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
await mgmt_client.close_async()

def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None):
"""
Expand All @@ -201,10 +184,7 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
source = Source(source_url)
if offset is not None:
source.set_filter(offset.selector())
handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop)
handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop)
self.clients.append(handler)
return handler

Expand Down
93 changes: 74 additions & 19 deletions azure/eventhub/_async/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
import asyncio

from uamqp import errors, types
from uamqp import ReceiveClientAsync
from uamqp import ReceiveClientAsync, Source

from azure.eventhub import EventHubError, EventData
from azure.eventhub.receiver import Receiver
from azure.eventhub.common import _error_handler


class AsyncReceiver(Receiver):
"""
Implements the async API of a Receiver.
"""

def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
"""
Instantiate an async receiver.
Expand All @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli
:param loop: An event loop.
"""
self.loop = loop or asyncio.get_event_loop()
self.client = client
self.source = source
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.redirected = None
self.error = None
self.debug = client.debug
self.offset = None
self.prefetch = prefetch
self.properties = None
self.epoch = epoch
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClientAsync(
source,
auth=client.auth,
debug=self.debug,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)

async def open_async(self, connection):
async def open_async(self):
"""
Open the Receiver using the supplied conneciton.
If the handler has previously been redirected, the redirect
Expand All @@ -60,16 +68,54 @@ async def open_async(self, connection):
:param connection: The underlying client shared connection.
:type: connection: ~uamqp._async.connection_async.ConnectionAsync
"""
# pylint: disable=protected-access
if self.redirected:
self.source = self.redirected.address
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = ReceiveClientAsync(
self.redirected.address,
auth=None,
debug=self.debug,
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)
await self._handler.open_async(connection=connection)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def has_started(self):
"""
Expand All @@ -88,7 +134,7 @@ async def has_started(self):
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
return False
elif not await self._handler._client_ready():
elif not await self._handler._client_ready_async():
return False
else:
return True
Expand All @@ -109,6 +155,8 @@ async def close_async(self, exception=None):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = EventHubError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
Expand All @@ -129,21 +177,28 @@ async def receive(self, max_batch_size=None, timeout=None):
"""
if self.error:
raise self.error
data_batch = []
try:
timeout_ms = 1000 * timeout if timeout else 0
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=timeout_ms)
data_batch = []
for message in message_batch:
event_data = EventData(message=message)
self.offset = event_data.offset
data_batch.append(event_data)
return data_batch
except errors.LinkDetach as detach:
error = EventHubError(str(detach))
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
await self.reconnect_async()
return data_batch
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError:
await self.reconnect_async()
return data_batch
except Exception as e:
error = EventHubError("Receive failed: {}".format(e))
await self.close_async(exception=error)
Expand Down
Loading

0 comments on commit dd38016

Please sign in to comment.