Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live metrics collection of requests/dependencies/exceptions #34673

Merged
merged 18 commits into from
Mar 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Features Added

- Add live metrics collection of requests/dependencies/exceptions
([#34141](https://github.com/Azure/azure-sdk-for-python/pull/34141))
lzchen marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Microsoft OpenTelemetry exporter for Azure Monitor

The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or has more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup.
The exporter for Azure Monitor allows Python applications to export data from the OpenTelemetry SDK to Azure Monitor. The exporter is intended for users who require advanced configuration or have more complicated telemetry needs that require all of distributed tracing, logging and metrics. If you have simpler configuration requirements, we recommend using the [Azure Monitor OpenTelemetry Distro](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-enable?tabs=python) instead for a simpler one-line setup.

Prior to using this SDK, please read and understand [Data Collection Basics](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python), especially the section on [telemetry types](https://learn.microsoft.com/azure/azure-monitor/app/opentelemetry-overview?tabs=python#telemetry-types). OpenTelemetry terminology differs from Application Insights terminology so it is important to understand the way the telemetry types map to each other.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from enum import Enum

# cSpell:disable

# (OpenTelemetry metric name, Quickpulse metric name)
Expand Down Expand Up @@ -33,4 +35,18 @@
]
)

# Quickpulse intervals
_SHORT_PING_INTERVAL_SECONDS = 5
_POST_INTERVAL_SECONDS = 1
_LONG_PING_INTERVAL_SECONDS = 60
_POST_CANCEL_INTERVAL_SECONDS = 20

# Live metrics data types
class _DocumentIngressDocumentType(Enum):
Request = "Request"
RemoteDependency = "RemoteDependency"
Exception = "Exception"
Event = "Event"
Trace = "Trace"

# cSpell:disable
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from datetime import datetime, timezone
from enum import Enum
from typing import Any, List, Optional
from typing import Any, Optional

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand All @@ -13,16 +11,8 @@
from opentelemetry.sdk.metrics import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics._internal.point import (
NumberDataPoint,
HistogramDataPoint,
MetricsData,
)
from opentelemetry.sdk.metrics._internal.point import MetricsData
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
MetricExporter,
Expand All @@ -32,31 +22,33 @@
)

from azure.core.exceptions import HttpResponseError
from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_METRIC_NAME_MAPPINGS
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_LONG_PING_INTERVAL_SECONDS,
_POST_CANCEL_INTERVAL_SECONDS,
_POST_INTERVAL_SECONDS,
)
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import (
DocumentIngress,
MetricPoint,
MonitoringDataPoint,
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
_get_global_quickpulse_state,
_is_ping_state,
_set_global_quickpulse_state,
_get_and_clear_quickpulse_documents,
_QuickpulseState,
)
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_metric_to_quick_pulse_data_points,
)
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask


_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = {
_QUICKPULSE_METRIC_TEMPORALITIES = {
# Use DELTA temporalities because we want to reset the counts every collection interval
Counter: AggregationTemporality.DELTA,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableGauge: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
}

_SHORT_PING_INTERVAL_SECONDS = 5
_POST_INTERVAL_SECONDS = 1
_LONG_PING_INTERVAL_SECONDS = 60
_POST_CANCEL_INTERVAL_SECONDS = 20


class _Response:
"""Response that encapsulates pipeline response and response headers from
Expand Down Expand Up @@ -91,7 +83,7 @@ def __init__(self, connection_string: Optional[str]) -> None:

MetricExporter.__init__(
self,
preferred_temporality=_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore
preferred_temporality=_QUICKPULSE_METRIC_TEMPORALITIES, # type: ignore
)

def export(
Expand All @@ -116,7 +108,7 @@ def export(
data_points = _metric_to_quick_pulse_data_points(
metrics_data,
base_monitoring_data_point=base_monitoring_data_point,
documents=kwargs.get("documents"),
documents=_get_and_clear_quickpulse_documents(),
)

token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
Expand Down Expand Up @@ -190,16 +182,6 @@ def _ping(self, monitoring_data_point) -> Optional[_Response]:
return ping_response


class _QuickpulseState(Enum):
"""Current state of quickpulse service.
The numerical value represents the ping/post interval in ms for those states.
"""

PING_SHORT = _SHORT_PING_INTERVAL_SECONDS
PING_LONG = _LONG_PING_INTERVAL_SECONDS
POST_SHORT = _POST_INTERVAL_SECONDS


class _QuickpulseMetricReader(MetricReader):

def __init__(
Expand All @@ -208,7 +190,6 @@ def __init__(
base_monitoring_data_point: MonitoringDataPoint,
) -> None:
self._exporter = exporter
self._quick_pulse_state = _QuickpulseState.PING_SHORT
self._base_monitoring_data_point = base_monitoring_data_point
self._elapsed_num_seconds = 0
self._worker = PeriodicTask(
Expand All @@ -224,9 +205,9 @@ def __init__(
self._worker.start()

def _ticker(self) -> None:
if self._is_ping_state():
if _is_ping_state():
# Send a ping if elapsed number of request meets the threshold
if self._elapsed_num_seconds % int(self._quick_pulse_state.value) == 0:
if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0:
print("pinging...")
ping_response = self._exporter._ping( # pylint: disable=protected-access
self._base_monitoring_data_point,
Expand All @@ -236,22 +217,22 @@ def _ticker(self) -> None:
if header and header == "true":
print("ping succeeded: switching to post")
# Switch state to post if subscribed
self._quick_pulse_state = _QuickpulseState.POST_SHORT
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
self._elapsed_num_seconds = 0
else:
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \
if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \
self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS:
print("ping failed for 60s, switching to pinging every 60s")
self._quick_pulse_state = _QuickpulseState.PING_LONG
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
# TODO: Implement redirect
else:
# Erroneous ping responses instigate backoff logic
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \
if _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT and \
self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS:
print("ping failed for 60s, switching to pinging every 60s")
self._quick_pulse_state = _QuickpulseState.PING_LONG
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
else:
print("posting...")
try:
Expand All @@ -262,7 +243,7 @@ def _ticker(self) -> None:
# And resume pinging
if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS:
print("post failed for 20s, switching to pinging")
self._quick_pulse_state = _QuickpulseState.PING_SHORT
_set_global_quickpulse_state(_QuickpulseState.PING_SHORT)
self._elapsed_num_seconds = 0

self._elapsed_num_seconds += 1
Expand All @@ -277,7 +258,6 @@ def _receive_metrics(
metrics_data,
timeout_millis=timeout_millis,
base_monitoring_data_point=self._base_monitoring_data_point,
documents=[],
)
if result is MetricExportResult.FAILURE:
# There is currently no way to propagate unsuccessful metric post so
Expand All @@ -288,41 +268,3 @@ def _receive_metrics(
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._worker.cancel()
self._worker.join()

def _is_ping_state(self):
return self._quick_pulse_state in (_QuickpulseState.PING_SHORT, _QuickpulseState.PING_LONG)

def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks
metrics_data: OTMetricsData,
base_monitoring_data_point: MonitoringDataPoint,
documents: Optional[List[DocumentIngress]],
) -> List[MonitoringDataPoint]:
metric_points = []
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
for point in metric.data.data_points:
if point is not None:
metric_point = MetricPoint(
name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()],
weight=1,
)
if isinstance(point, HistogramDataPoint):
metric_point.value = point.sum
elif isinstance(point, NumberDataPoint):
metric_point.value = point.value
else:
metric_point.value = 0
metric_points.append(metric_point)
return [
MonitoringDataPoint(
version=base_monitoring_data_point.version,
instance=base_monitoring_data_point.instance,
role_name=base_monitoring_data_point.role_name,
machine_name=base_monitoring_data_point.machine_name,
stream_id=base_monitoring_data_point.stream_id,
timestamp=datetime.now(tz=timezone.utc),
metrics=metric_points,
documents=documents,
)
]
Loading
Loading