Skip to content

Commit

Permalink
Implement exporting span events as message/exception telemetry (Azure…
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Apr 7, 2022
1 parent c7df407 commit bc31f16
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
([#23486](https://github.com/Azure/azure-sdk-for-python/pull/23486))
- Implement sending of exception telemetry via log exporter
([#23633](https://github.com/Azure/azure-sdk-for-python/pull/23633))

- Implement exporting span events as message/exception telemetry
([#23708](https://github.com/Azure/azure-sdk-for-python/pull/23708))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import pkg_resources

from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.sdk.util import ns_to_iso_str

from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
from azure.monitor.opentelemetry.exporter._version import VERSION as ext_version


Expand Down Expand Up @@ -74,6 +76,14 @@ def run(self):
def cancel(self):
self.finished.set()

def _create_telemetry_item(timestamp):
return TelemetryItem(
name="",
instrumentation_key="",
tags=dict(azure_monitor_context),
time=ns_to_iso_str(timestamp),
)

def _populate_part_a_fields(resource):
tags = {}
if resource and resource.attributes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.severity import SeverityNumber
from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
from opentelemetry.sdk.util import ns_to_iso_str

from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
Expand Down Expand Up @@ -90,12 +89,7 @@ def from_connection_string(
# pylint: disable=protected-access
def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
log_record = log_data.log_record
envelope = TelemetryItem(
name="",
instrumentation_key="",
tags=dict(_utils.azure_monitor_context),
time=ns_to_iso_str(log_record.timestamp),
)
envelope = _utils._create_telemetry_item(log_record.timestamp)
envelope.tags.update(_utils._populate_part_a_fields(log_record.resource))
envelope.tags["ai.operation.id"] = "{:032x}".format(
log_record.trace_id or _DEFAULT_TRACE_ID
Expand All @@ -118,6 +112,8 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
if exc_type is not None or exc_message is not None:
envelope.name = "Microsoft.ApplicationInsights.Exception"
has_full_stack = stack_trace is not None
if not exc_message:
exc_message = "Exception"
exc_details = TelemetryExceptionDetails(
type_name=exc_type,
message=exc_message,
Expand All @@ -130,7 +126,7 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
exceptions=[exc_details],
)
# pylint: disable=line-too-long
envelope.data = MonitorBase(base_data=data, base_type="TelemetryExceptionData")
envelope.data = MonitorBase(base_data=data, base_type="ExceptionData")
else: # Message telemetry
envelope.name = "Microsoft.ApplicationInsights.Message"
# pylint: disable=line-too-long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.util import ns_to_iso_str
from opentelemetry.trace import Span, SpanKind

from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
MessageData,
MonitorBase,
RemoteDependencyData,
RequestData,
TelemetryExceptionData,
TelemetryExceptionDetails,
TelemetryItem
)
from azure.monitor.opentelemetry.exporter.export._base import (
Expand All @@ -36,7 +38,10 @@ def export(self, spans: Sequence[Span], **kwargs: Any) -> SpanExportResult: # py
:type spans: Sequence[~opentelemetry.trace.Span]
:rtype: ~opentelemetry.sdk.trace.export.SpanExportResult
"""
envelopes = [self._span_to_envelope(span) for span in spans]
envelopes = []
for span in spans:
envelopes.append(self._span_to_envelope(span))
envelopes.extend(self._span_events_to_envelopes(span))
try:
result = self._transmit(envelopes)
if result == ExportResult.FAILED_RETRYABLE:
Expand Down Expand Up @@ -64,6 +69,14 @@ def _span_to_envelope(self, span: Span) -> TelemetryItem:
envelope.instrumentation_key = self._instrumentation_key
return envelope

def _span_events_to_envelopes(self, span: Span) -> Sequence[TelemetryItem]:
if not span or len(span.events) == 0:
return []
envelopes = _convert_span_events_to_envelopes(span)
for envelope in envelopes:
envelope.instrumentation_key = self._instrumentation_key
return envelopes

@classmethod
def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorTraceExporter":
"""
Expand All @@ -82,24 +95,17 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorTr
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
# pylint: disable=protected-access
def _convert_span_to_envelope(span: Span) -> TelemetryItem:
envelope = TelemetryItem(
name="",
instrumentation_key="",
tags=dict(_utils.azure_monitor_context),
time=ns_to_iso_str(span.start_time),
)
# pylint: disable=protected-access
envelope = _utils._create_telemetry_item(span.start_time)
envelope.tags.update(_utils._populate_part_a_fields(span.resource))

envelope.tags["ai.operation.id"] = "{:032x}".format(span.context.trace_id)
if SpanAttributes.ENDUSER_ID in span.attributes:
envelope.tags["ai.user.id"] = span.attributes[SpanAttributes.ENDUSER_ID]
if span.parent and span.parent.span_id:
envelope.tags["ai.operation.parentId"] = "{:016x}".format(
span.parent.span_id
)

# pylint: disable=too-many-nested-blocks
if span.kind in (SpanKind.CONSUMER, SpanKind.SERVER):
envelope.name = "Microsoft.ApplicationInsights.Request"
Expand Down Expand Up @@ -382,6 +388,52 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
data.properties["_MS.links"] = json.dumps(links)
return envelope

# pylint: disable=protected-access
def _convert_span_events_to_envelopes(span: Span) -> Sequence[TelemetryItem]:
envelopes = []
for event in span.events:
envelope = _utils._create_telemetry_item(event.timestamp)
envelope.tags.update(_utils._populate_part_a_fields(span.resource))
envelope.tags["ai.operation.id"] = "{:032x}".format(span.context.trace_id)
if span.parent and span.parent.span_id:
envelope.tags["ai.operation.parentId"] = "{:016x}".format(
span.parent.span_id
)
properties = {}
if event.name == "exception":
envelope.name = 'Microsoft.ApplicationInsights.Exception'
exc_type = event.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = event.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_message is None or not exc_message:
exc_message = "Exception"
stack_trace = event.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE)
escaped = event.attributes.get(SpanAttributes.EXCEPTION_ESCAPED)
properties[SpanAttributes.EXCEPTION_ESCAPED] = escaped
has_full_stack = stack_trace is not None
exc_details = TelemetryExceptionDetails(
type_name=exc_type,
message=exc_message,
has_full_stack=has_full_stack,
stack=stack_trace,
)
data = TelemetryExceptionData(
properties=properties,
exceptions=[exc_details],
)
# pylint: disable=line-too-long
envelope.data = MonitorBase(base_data=data, base_type='ExceptionData')
else:
envelope.name = 'Microsoft.ApplicationInsights.Message'
properties.update(event.attributes)
data = MessageData(
message=event.name,
properties=properties,
)
envelope.data = MonitorBase(base_data=data, base_type='MessageData')

envelopes.append(envelope)

return envelopes

# pylint:disable=too-many-return-statements
def _get_default_port_db(dbsystem):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These code samples show common champion scenario operations with the AzureMonito
* Azure Service Bus Receive: [sample_servicebus_receive.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_servicebus_receive.py)
* Azure Storage Blob Create Container: [sample_storage.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_storage.py)
* Client: [sample_client.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_client.py)
* Event: [sample_event.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_event.py)
* Jaeger: [sample_jaeger.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_jaeger.py)
* Trace: [sample_trace.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_trace.py)
* Server: [sample_server.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/monitor/azure-monitor-opentelemetry-exporter/samples/traces/sample_server.py)
Expand Down Expand Up @@ -49,6 +50,17 @@ $ # from this directory
$ python sample_request.py
```

### Event

* Update `APPLICATIONINSIGHTS_CONNECTION_STRING` environment variable

* Run the sample

```sh
$ # from this directory
$ python sample_event.py
```

### Server

* Update `APPLICATIONINSIGHTS_CONNECTION_STRING` environment variable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""
An example to show an application using custom events. Events are added
to the span and exported via the AzureMonitorTraceExporter.
"""
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter

exporter = AzureMonitorTraceExporter.from_connection_string(
os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
)

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
span_processor = BatchSpanProcessor(exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Message events
with tracer.start_as_current_span("hello") as span:
span.add_event("Custom event", {"test": "attributes"})
print("Hello, World!")

# Exception events
try:
with tracer.start_as_current_span("hello") as span:
raise Exception("Custom exception message.")
except Exception:
print("Exception raised")
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_log_to_envelope_log(self):
def test_log_to_envelope_exception(self):
exporter = self._exporter
envelope = exporter._log_to_envelope(self._exc_data)
self.assertEqual(envelope.data.base_type, 'TelemetryExceptionData')
self.assertEqual(envelope.data.base_type, 'ExceptionData')
self.assertEqual(envelope.data.base_data.severity_level, 4)
self.assertEqual(envelope.data.base_data.properties["test"], "attribute")
self.assertEqual(len(envelope.data.base_data.exceptions), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# pylint: disable=import-error
from opentelemetry.sdk import trace, resources
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Link, SpanContext, SpanKind
from opentelemetry.trace.status import Status, StatusCode

Expand Down Expand Up @@ -775,6 +776,109 @@ def test_span_to_envelope_properties(self):
)[0]
self.assertEqual(json_dict["id"], "a6f5d48acb4d31da")

def test_span_events_to_envelopes_exception(self):
exporter = self._exporter
time = 1575494316027613500

span = trace._Span(
name="test",
context=SpanContext(
trace_id=36873507687745823477771305566750195431,
span_id=12030755672171557337,
is_remote=False,
),
parent=SpanContext(
trace_id=36873507687745823477771305566750195432,
span_id=12030755672171557337,
is_remote=False,
),
kind=SpanKind.CLIENT,
)
attributes = {
SpanAttributes.EXCEPTION_TYPE: "ZeroDivisionError",
SpanAttributes.EXCEPTION_MESSAGE: "zero division error",
SpanAttributes.EXCEPTION_STACKTRACE: "Traceback: ZeroDivisionError, division by zero",
SpanAttributes.EXCEPTION_ESCAPED: "True",
}
span.add_event("exception", attributes, time)
span.start()
span.end()
span._status = Status(status_code=StatusCode.OK)
envelopes = exporter._span_events_to_envelopes(span)

self.assertEqual(len(envelopes), 1)
envelope = envelopes[0]
self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.Exception"
)
self.assertEqual(envelope.instrumentation_key,
"1234abcd-5678-4efa-8abc-1234567890ab")
self.assertIsNotNone(envelope.tags)
self.assertEqual(envelope.tags.get("ai.device.id"), azure_monitor_context["ai.device.id"])
self.assertEqual(envelope.tags.get("ai.device.locale"), azure_monitor_context["ai.device.locale"])
self.assertEqual(envelope.tags.get("ai.device.osVersion"), azure_monitor_context["ai.device.osVersion"])
self.assertEqual(envelope.tags.get("ai.device.type"), azure_monitor_context["ai.device.type"])
self.assertEqual(envelope.tags.get("ai.internal.sdkVersion"), azure_monitor_context["ai.internal.sdkVersion"])
self.assertEqual(envelope.tags.get("ai.operation.id"), "{:032x}".format(span.context.trace_id))
self.assertEqual(envelope.tags.get("ai.operation.parentId"), "{:016x}".format(span.context.span_id))
self.assertEqual(envelope.time, "2019-12-04T21:18:36.027613Z")
self.assertEqual(len(envelope.data.base_data.properties), 1)
self.assertEqual(envelope.data.base_data.properties[SpanAttributes.EXCEPTION_ESCAPED], "True")
self.assertEqual(len(envelope.data.base_data.exceptions), 1)
self.assertEqual(envelope.data.base_data.exceptions[0].type_name, "ZeroDivisionError")
self.assertEqual(envelope.data.base_data.exceptions[0].message, "zero division error")
self.assertEqual(envelope.data.base_data.exceptions[0].has_full_stack, True)
self.assertEqual(envelope.data.base_data.exceptions[0].stack, "Traceback: ZeroDivisionError, division by zero")
self.assertEqual(envelope.data.base_type, "ExceptionData")

def test_span_events_to_envelopes_message(self):
exporter = self._exporter
time = 1575494316027613500

span = trace._Span(
name="test",
context=SpanContext(
trace_id=36873507687745823477771305566750195431,
span_id=12030755672171557337,
is_remote=False,
),
parent=SpanContext(
trace_id=36873507687745823477771305566750195432,
span_id=12030755672171557337,
is_remote=False,
),
kind=SpanKind.CLIENT,
)
attributes = {
"test": "asd",
}
span.add_event("test event", attributes, time)
span.start()
span.end()
span._status = Status(status_code=StatusCode.OK)
envelopes = exporter._span_events_to_envelopes(span)

self.assertEqual(len(envelopes), 1)
envelope = envelopes[0]
self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.Message"
)
self.assertEqual(envelope.instrumentation_key,
"1234abcd-5678-4efa-8abc-1234567890ab")
self.assertIsNotNone(envelope.tags)
self.assertEqual(envelope.tags.get("ai.device.id"), azure_monitor_context["ai.device.id"])
self.assertEqual(envelope.tags.get("ai.device.locale"), azure_monitor_context["ai.device.locale"])
self.assertEqual(envelope.tags.get("ai.device.osVersion"), azure_monitor_context["ai.device.osVersion"])
self.assertEqual(envelope.tags.get("ai.device.type"), azure_monitor_context["ai.device.type"])
self.assertEqual(envelope.tags.get("ai.internal.sdkVersion"), azure_monitor_context["ai.internal.sdkVersion"])
self.assertEqual(envelope.tags.get("ai.operation.id"), "{:032x}".format(span.context.trace_id))
self.assertEqual(envelope.tags.get("ai.operation.parentId"), "{:016x}".format(span.context.span_id))
self.assertEqual(envelope.time, "2019-12-04T21:18:36.027613Z")
self.assertEqual(len(envelope.data.base_data.properties), 1)
self.assertEqual(envelope.data.base_data.properties["test"], "asd")
self.assertEqual(envelope.data.base_data.message, "test event")
self.assertEqual(envelope.data.base_type, "MessageData")


class TestAzureTraceExporterUtils(unittest.TestCase):
def test_get_trace_export_result(self):
Expand Down

0 comments on commit bc31f16

Please sign in to comment.