diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt index 644215a8ef85..49d226f4bf52 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -1,3 +1,5 @@ +-e ../../../tools/azure-sdk-tools -e ../../core/azure-core +-e ../../storage/azure-storage-blob ../azure-eventhubs pytest-asyncio>=0.8.0; python_version >= '3.5' diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py index 41cba5b3027e..c83b2b6964c6 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py @@ -65,7 +65,7 @@ packages=find_packages(exclude=exclude_packages), python_requires=">=3.5.3", install_requires=[ - 'azure-storage-blob<12.0.0b4,>=12.0.0b2', + 'azure-storage-blob<=12.0.0b4,>=12.0.0b2', 'azure-eventhub<6.0.0,>=5.0.0b3', 'aiohttp<4.0,>=3.0', ], diff --git a/sdk/eventhub/azure-eventhubs/HISTORY.md b/sdk/eventhub/azure-eventhubs/HISTORY.md index c5af555047bc..f0577a7640d9 100644 --- a/sdk/eventhub/azure-eventhubs/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs/HISTORY.md @@ -1,5 +1,11 @@ # Release History +## 5.0.0b4 (2019-XX-XX) + +**New features** + +- Support for tracing #7153 + ## 5.0.0b3 (2019-09-10) **New features** diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 62b2a6b811d8..888e86a98986 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- __path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore -__version__ = "5.0.0b3" +__version__ = "5.0.0b4" from uamqp import constants # type: ignore from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index efad6a3cb7db..015b11190212 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -102,6 +102,7 @@ async def __anext__(self): self._messages_iter = self._handler.receive_messages_iter_async() message = await self._messages_iter.__anext__() event_data = EventData._from_message(message) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -179,6 +180,8 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + event_data._trace_link_message() # pylint:disable=protected-access + return data_batch async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 18446249ff23..3183dc051ac8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -3,11 +3,15 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # ----------------------------------------------------------------------------------- +from contextlib import contextmanager from typing import Dict, Type import uuid import asyncio import logging +from azure.core.tracing import SpanKind +from azure.core.settings import settings + from azure.eventhub import EventPosition, EventHubError from azure.eventhub.aio import EventHubClient from .partition_context import PartitionContext @@ -185,7 +189,23 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list): if partition_id not in self._tasks or self._tasks[partition_id].done(): self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership)) - async def _receive(self, ownership): + @contextmanager + def _context(self, events): + # Tracing + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is None: + yield + else: + child = span_impl_type(name="Azure.EventHubs.process") + self._eventhub_client._add_span_request_attributes(child) # pylint: disable=protected-access + child.kind = SpanKind.SERVER + + for event in events: + event._trace_link_message(child) # pylint: disable=protected-access + with child: + yield + + async def _receive(self, ownership): # pylint: disable=too-many-statements log.info("start ownership, %r", ownership) partition_processor = self._partition_processor_factory() partition_id = ownership["partition_id"] @@ -247,7 +267,9 @@ async def close(reason): while True: try: events = await partition_consumer.receive() - await partition_processor.process_events(events, partition_context) + with self._context(events): + await partition_processor.process_events(events, partition_context) + except asyncio.CancelledError: log.info( "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index ec4e39c87116..8ef299b0e6da 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -5,15 +5,18 @@ import uuid import asyncio import logging -from typing import Iterable, Union, Any +from typing import Iterable, Union import time from uamqp import types, constants, errors # type: ignore from uamqp import SendClientAsync # type: ignore +from azure.core.tracing import SpanKind +from azure.core.settings import settings + from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError -from ..producer import _error, _set_partition_key +from ..producer import _error, _set_partition_key, _set_trace_message from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -213,12 +216,19 @@ async def send( :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ + # Tracing code + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + child = None + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.send") + child.kind = SpanKind.CLIENT # Should be PRODUCER self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data + wrapper_event_data._trace_message(child) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -227,10 +237,18 @@ async def send( else: if partition_key: event_data = _set_partition_key(event_data, partition_key) + event_data = _set_trace_message(event_data, child) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access + wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor + + if span_impl_type is not None: + with child: + self._client._add_span_request_attributes(child) # pylint: disable=protected-access + await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor + else: + await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index c6879730266c..62ea791a5894 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -214,6 +214,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use properties["user-agent"] = final_user_agent return properties + def _add_span_request_attributes(self, span): + span.add_attribute("component", "eventhubs") + span.add_attribute("message_bus.destination", self._address.path) + span.add_attribute("peer.address", self._address.hostname) + def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5923d7f57972..3f2545829748 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -12,6 +12,9 @@ from uamqp import BatchMessage, Message, types, constants # type: ignore from uamqp.message import MessageHeader, MessageProperties # type: ignore + +from azure.core.settings import settings + from azure.eventhub.error import EventDataError log = logging.getLogger(__name__) @@ -114,6 +117,35 @@ def _set_partition_key(self, value): self.message.header = header self._annotations = annotations + def _trace_message(self, parent_span=None): + """Add tracing information to this message. + + Will open and close a "Azure.EventHubs.message" span, and + add the "DiagnosticId" as app properties of the message. + """ + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) + message_span = current_span.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(self.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + self.application_properties = app_prop + message_span.finish() + + def _trace_link_message(self, parent_span=None): + """Link the current message to current span. + + Will extract DiagnosticId if available. + """ + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) + if current_span and self.application_properties: + traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii') + if traceparent: + current_span.link(traceparent) + @staticmethod def _from_message(message): event_data = EventData(body='') @@ -328,6 +360,8 @@ def try_add(self, event_data): if not event_data.partition_key: event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access + event_data._trace_message() # pylint:disable=protected-access + event_data_size = event_data.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 604d9c7d7b82..ff996a57747a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -98,6 +98,7 @@ def __next__(self): self._messages_iter = self._handler.receive_messages_iter() message = next(self._messages_iter) event_data = EventData._from_message(message) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -173,6 +174,8 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + event_data._trace_link_message() # pylint:disable=protected-access + return data_batch def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index cab9638f2acc..6e562bbdf051 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -12,6 +12,9 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClient # type: ignore +from azure.core.tracing import SpanKind +from azure.core.settings import settings + from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ._consumer_producer_mixin import ConsumerProducerMixin @@ -32,6 +35,13 @@ def _set_partition_key(event_datas, partition_key): yield ed +def _set_trace_message(event_datas, parent_span=None): + ed_iter = iter(event_datas) + for ed in ed_iter: + ed._trace_message(parent_span) # pylint:disable=protected-access + yield ed + + class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes """ A producer responsible for transmitting EventData to a specific Event Hub, @@ -218,12 +228,19 @@ def send(self, event_data, partition_key=None, timeout=None): :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ + # Tracing code + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + child = None + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.send") + child.kind = SpanKind.CLIENT # Should be PRODUCER self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data + wrapper_event_data._trace_message(child) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -232,10 +249,17 @@ def send(self, event_data, partition_key=None, timeout=None): else: if partition_key: event_data = _set_partition_key(event_data, partition_key) + event_data = _set_trace_message(event_data, child) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - self._send_event_data_with_retry(timeout=timeout) + + if span_impl_type is not None: + with child: + self._client._add_span_request_attributes(child) # pylint: disable=protected-access + self._send_event_data_with_retry(timeout=timeout) + else: + self._send_event_data_with_retry(timeout=timeout) def close(self, exception=None): # pylint:disable=useless-super-delegation # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/dev_requirements.txt b/sdk/eventhub/azure-eventhubs/dev_requirements.txt index 338710e52fb3..79be4ffee6f0 100644 --- a/sdk/eventhub/azure-eventhubs/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs/dev_requirements.txt @@ -1,7 +1,7 @@ --e ../../servicebus/azure-servicebus --e ../../core/azure-core -e ../../../tools/azure-sdk-tools +-e ../../core/azure-core -e ../../identity/azure-identity +-e ../../servicebus/azure-servicebus pytest-asyncio>=0.8.0; python_version > '3.4' docutils>=0.14 pygments>=2.2.0 diff --git a/sdk/eventhub/azure-eventhubs/setup.py b/sdk/eventhub/azure-eventhubs/setup.py index 1ffa5c93005f..aae5cc60b638 100644 --- a/sdk/eventhub/azure-eventhubs/setup.py +++ b/sdk/eventhub/azure-eventhubs/setup.py @@ -67,6 +67,7 @@ zip_safe=False, packages=find_packages(exclude=exclude_packages), install_requires=[ + "azure-core<2.0.0,>=1.0.0b4", 'uamqp~=1.2.0', 'azure-common~=1.1', ], diff --git a/shared_requirements.txt b/shared_requirements.txt index 47fc6b58dfd4..601385e1a452 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -106,6 +106,7 @@ six>=1.6 #override azure-storage-blob azure-core<2.0.0,>=1.0.0b4 #override azure-storage-queue azure-core<2.0.0,>=1.0.0b4 #override azure-storage-file azure-core<2.0.0,>=1.0.0b4 +#override azure-eventhub azure-core<2.0.0,>=1.0.0b4 #override azure-cosmos azure-core<2.0.0,>=1.0.0b3 -#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2 -#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 +#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<=12.0.0b4,>=12.0.0b2 +#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 \ No newline at end of file