Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventhub tracing #7153

Merged
merged 26 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
40c9d8f
Experimentation on tracing and EventHubs
lmazuel Sep 9, 2019
93dc908
Continue to let the initial excp raise
lmazuel Sep 10, 2019
b22a944
Naive direct tracing implementation
lmazuel Sep 11, 2019
e8724a4
Update Kind in EventHub to generic one
lmazuel Sep 12, 2019
c4dee26
Use contextmanager in EventHub
lmazuel Sep 12, 2019
e23c6f0
Remove opencensus specific import
lmazuel Sep 12, 2019
8c4bede
Fix possible AttributeError
lmazuel Sep 12, 2019
ce59dc0
Remove parent concept
lmazuel Sep 13, 2019
567b2bc
Remove receive tracing code
lmazuel Sep 16, 2019
a205ea5
Don't execute tracing on message if no tracing loaded
lmazuel Sep 19, 2019
443cdc8
Try to re-order dev dep for CI
lmazuel Sep 23, 2019
83d7128
Fix EH plugin dev deps
lmazuel Sep 24, 2019
78b78ab
Add azure-core to azure-eventhub
lmazuel Sep 24, 2019
22b78c2
Share req
lmazuel Sep 24, 2019
82a39ef
ChangeLog
lmazuel Sep 24, 2019
db31f6b
EH extension is ok with b4
lmazuel Sep 24, 2019
d3433c3
Merge branch 'master' into eventhub_tracing
lmazuel Sep 24, 2019
632a8d8
Install blob SDK for EH extension
lmazuel Sep 24, 2019
c4c4fd1
pylint
lmazuel Sep 24, 2019
9fb2039
fix dev req
lmazuel Sep 24, 2019
2f4df0a
dep fix
lmazuel Sep 24, 2019
af4acd7
the override had <. the setup actually defines <=. need to update the…
scbedd Sep 25, 2019
ce2948a
Tracing message from receive iterators as well
lmazuel Sep 25, 2019
bd2da67
Producer simplification
lmazuel Sep 25, 2019
b025231
Simplify eventprocessor
lmazuel Sep 25, 2019
32ce0e8
Consider batch size
lmazuel Sep 27, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-e ../../../tools/azure-sdk-tools
-e ../../core/azure-core
-e ../../storage/azure-storage-blob
../azure-eventhubs
pytest-asyncio>=0.8.0; python_version >= '3.5'
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(exclude=exclude_packages),
python_requires=">=3.5.3",
install_requires=[
'azure-storage-blob<12.0.0b4,>=12.0.0b2',
'azure-storage-blob<=12.0.0b4,>=12.0.0b2',
'azure-eventhub<6.0.0,>=5.0.0b3',
'aiohttp<4.0,>=3.0',
],
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.0.0b4 (2019-XX-XX)

**New features**

- Support for tracing #7153

## 5.0.0b3 (2019-09-10)

**New features**
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
__version__ = "5.0.0b3"
__version__ = "5.0.0b4"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def __anext__(self):
self._messages_iter = self._handler.receive_messages_iter_async()
message = await self._messages_iter.__anext__()
event_data = EventData._from_message(message) # pylint:disable=protected-access
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
return event_data
Expand Down Expand Up @@ -179,6 +180,8 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

return data_batch

async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from contextlib import contextmanager
from typing import Dict, Type
import uuid
import asyncio
import logging

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub import EventPosition, EventHubError
from azure.eventhub.aio import EventHubClient
from .partition_context import PartitionContext
Expand Down Expand Up @@ -185,7 +189,23 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
if partition_id not in self._tasks or self._tasks[partition_id].done():
self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership))

async def _receive(self, ownership):
@contextmanager
def _context(self, events):
# Tracing
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is None:
yield
else:
child = span_impl_type(name="Azure.EventHubs.process")
self._eventhub_client._add_span_request_attributes(child) # pylint: disable=protected-access
child.kind = SpanKind.SERVER

for event in events:
event._trace_link_message(child) # pylint: disable=protected-access
with child:
yield

async def _receive(self, ownership): # pylint: disable=too-many-statements
log.info("start ownership, %r", ownership)
partition_processor = self._partition_processor_factory()
partition_id = ownership["partition_id"]
Expand Down Expand Up @@ -247,7 +267,9 @@ async def close(reason):
while True:
try:
events = await partition_consumer.receive()
await partition_processor.process_events(events, partition_context)
with self._context(events):
await partition_processor.process_events(events, partition_context)

except asyncio.CancelledError:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
Expand Down
24 changes: 21 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
import uuid
import asyncio
import logging
from typing import Iterable, Union, Any
from typing import Iterable, Union
import time

from uamqp import types, constants, errors # type: ignore
from uamqp import SendClientAsync # type: ignore

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ..producer import _error, _set_partition_key
from ..producer import _error, _set_partition_key, _set_trace_message
from ._consumer_producer_mixin_async import ConsumerProducerMixin

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -213,12 +216,19 @@ async def send(
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.

"""
# Tracing code
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER

self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
wrapper_event_data._trace_message(child) # pylint: disable=protected-access
else:
if isinstance(event_data, EventDataBatch):
if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access
Expand All @@ -227,10 +237,18 @@ async def send(
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data, child)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access

wrapper_event_data.message.on_send_complete = self._on_outcome
self._unsent_events = [wrapper_event_data.message]
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor

if span_impl_type is not None:
with child:
self._client._add_span_request_attributes(child) # pylint: disable=protected-access
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor
else:
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use
properties["user-agent"] = final_user_agent
return properties

def _add_span_request_attributes(self, span):
span.add_attribute("component", "eventhubs")
span.add_attribute("message_bus.destination", self._address.path)
span.add_attribute("peer.address", self._address.hostname)

def _process_redirect_uri(self, redirect):
redirect_uri = redirect.address.decode('utf-8')
auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups")
Expand Down
34 changes: 34 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

from uamqp import BatchMessage, Message, types, constants # type: ignore
from uamqp.message import MessageHeader, MessageProperties # type: ignore

from azure.core.settings import settings

from azure.eventhub.error import EventDataError

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,6 +117,35 @@ def _set_partition_key(self, value):
self.message.header = header
self._annotations = annotations

def _trace_message(self, parent_span=None):
"""Add tracing information to this message.

Will open and close a "Azure.EventHubs.message" span, and
add the "DiagnosticId" as app properties of the message.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
message_span = current_span.span(name="Azure.EventHubs.message")
message_span.start()
app_prop = dict(self.application_properties)
app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii'))
self.application_properties = app_prop
message_span.finish()

def _trace_link_message(self, parent_span=None):
"""Link the current message to current span.

Will extract DiagnosticId if available.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
if current_span and self.application_properties:
traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii')
if traceparent:
current_span.link(traceparent)

@staticmethod
def _from_message(message):
event_data = EventData(body='')
Expand Down Expand Up @@ -328,6 +360,8 @@ def try_add(self, event_data):
if not event_data.partition_key:
event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access

event_data._trace_message() # pylint:disable=protected-access

event_data_size = event_data.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand Down
3 changes: 3 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __next__(self):
self._messages_iter = self._handler.receive_messages_iter()
message = next(self._messages_iter)
event_data = EventData._from_message(message) # pylint:disable=protected-access
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
return event_data
Expand Down Expand Up @@ -173,6 +174,8 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

return data_batch

def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
Expand Down
26 changes: 25 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from uamqp import types, constants, errors # type: ignore
from uamqp import SendClient # type: ignore

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ._consumer_producer_mixin import ConsumerProducerMixin
Expand All @@ -32,6 +35,13 @@ def _set_partition_key(event_datas, partition_key):
yield ed


def _set_trace_message(event_datas, parent_span=None):
ed_iter = iter(event_datas)
for ed in ed_iter:
ed._trace_message(parent_span) # pylint:disable=protected-access
yield ed


class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes
"""
A producer responsible for transmitting EventData to a specific Event Hub,
Expand Down Expand Up @@ -218,12 +228,19 @@ def send(self, event_data, partition_key=None, timeout=None):
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.

"""
# Tracing code
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER
lmazuel marked this conversation as resolved.
Show resolved Hide resolved

self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
wrapper_event_data._trace_message(child) # pylint: disable=protected-access
else:
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.
if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access
Expand All @@ -232,10 +249,17 @@ def send(self, event_data, partition_key=None, timeout=None):
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data, child)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self._unsent_events = [wrapper_event_data.message]
self._send_event_data_with_retry(timeout=timeout)

if span_impl_type is not None:
with child:
self._client._add_span_request_attributes(child) # pylint: disable=protected-access
self._send_event_data_with_retry(timeout=timeout)
else:
self._send_event_data_with_retry(timeout=timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both if and else has this statement. How about removing the else block and pull the same statement in "if" out of "if"?


def close(self, exception=None): # pylint:disable=useless-super-delegation
# type:(Exception) -> None
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-e ../../servicebus/azure-servicebus
-e ../../core/azure-core
-e ../../../tools/azure-sdk-tools
-e ../../core/azure-core
-e ../../identity/azure-identity
-e ../../servicebus/azure-servicebus
pytest-asyncio>=0.8.0; python_version > '3.4'
docutils>=0.14
pygments>=2.2.0
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhubs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[
"azure-core<2.0.0,>=1.0.0b4",
'uamqp~=1.2.0',
'azure-common~=1.1',
],
Expand Down
5 changes: 3 additions & 2 deletions shared_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ six>=1.6
#override azure-storage-blob azure-core<2.0.0,>=1.0.0b4
#override azure-storage-queue azure-core<2.0.0,>=1.0.0b4
#override azure-storage-file azure-core<2.0.0,>=1.0.0b4
#override azure-eventhub azure-core<2.0.0,>=1.0.0b4
#override azure-cosmos azure-core<2.0.0,>=1.0.0b3
#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0
#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<=12.0.0b4,>=12.0.0b2
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0