Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: detect and adapt to backoff package version #2980

Merged
merged 3 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976))
- [exporter/opentelemetry-exporter-otlp-proto-http] Add OTLPMetricExporter
([#2891](https://github.com/open-telemetry/opentelemetry-python/pull/2891))
- Fix a bug with exporter retries for with newer versions of the backoff library
([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980))

## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26

Expand Down Expand Up @@ -90,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2726](https://github.com/open-telemetry/opentelemetry-python/pull/2726))
- fix: frozenset object has no attribute items
([#2727](https://github.com/open-telemetry/opentelemetry-python/pull/2727))
- fix: create suppress HTTP instrumentation key in opentelemetry context
- fix: create suppress HTTP instrumentation key in opentelemetry context
([#2729](https://github.com/open-telemetry/opentelemetry-python/pull/2729))
- Support logs SDK auto instrumentation enable/disable with env
([#2728](https://github.com/open-telemetry/opentelemetry-python/pull/2728))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -183,6 +183,19 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -296,7 +309,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):
for delay in _expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,23 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -450,7 +466,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ dependencies = [
]

[project.optional-dependencies]
test = []
test = [
"responses == 0.22.0",
]

[project.entry-points.opentelemetry_traces_exporter]
otlp_proto_http = "opentelemetry.exporter.otlp.proto.http.trace_exporter:OTLPSpanExporter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -53,6 +53,18 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -122,7 +134,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_headers

import backoff
import requests
from backoff import expo

_logger = logging.getLogger(__name__)

Expand All @@ -77,6 +77,18 @@
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPMetricExporter(MetricExporter):

Expand Down Expand Up @@ -319,7 +331,7 @@ def export(
**kwargs,
) -> MetricExportResult:
serialized_data = self._translate_data(metrics_data)
for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
Expand Down Expand Up @@ -54,6 +54,18 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -133,7 +145,7 @@ def export(self, spans) -> SpanExportResult:

serialized_data = _ProtobufEncoder.serialize(spans)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from unittest.mock import patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
Expand Down Expand Up @@ -269,3 +270,30 @@ def test_serialization(self, mock_post):
verify=exporter._certificate_file,
timeout=exporter._timeout,
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://metrics.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPMetricExporter(
endpoint="http://metrics.example.com/export"
)
metrics_data = self.metrics["sum_int"]

exporter.export(metrics_data)
mock_sleep.assert_called_once_with(1)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from unittest.mock import MagicMock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
Expand Down Expand Up @@ -159,6 +160,31 @@ def test_serialize(self):
expected_encoding.SerializeToString(),
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://logs.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPLogExporter(endpoint="http://logs.example.com/export")
logs = self._get_sdk_log_data()

exporter.export(logs)
mock_sleep.assert_called_once_with(1)

@staticmethod
def _get_sdk_log_data() -> List[LogData]:
log1 = LogData(
Expand Down
Loading