diff --git a/CHANGELOG.md b/CHANGELOG.md index 86739f645f3..c342e0df5fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove `_start_time_unix_nano` attribute from `_ViewInstrumentMatch` in favor of using `time_ns()` at the moment when the aggregation object is created ([#4137](https://github.com/open-telemetry/opentelemetry-python/pull/4137)) +- Improve timeout and retry mechanic of exporters + ([#4183](https://github.com/open-telemetry/opentelemetry-python/pull/4183)) ## Version 1.26.0/0.47b0 (2024-07-25) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py new file mode 100644 index 00000000000..7734413079e --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py @@ -0,0 +1,178 @@ +# Copyright The OpenTelemetry Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from contextlib import contextmanager +from logging import getLogger +from time import time +from typing import Callable, Generic, Iterator, Optional, Type, TypeVar + +from ._internal import _create_exp_backoff_generator + +ExportResultT = TypeVar("ExportResultT", covariant=True) +ExportPayloadT = TypeVar("ExportPayloadT", covariant=True) + +_logger = getLogger(__name__) + + +class RetryableExportError(Exception): + def __init__(self, retry_delay_sec: Optional[int]): + super().__init__() + self.retry_delay_sec = retry_delay_sec + + +class RetryingExporter(Generic[ExportResultT]): + """OTLP exporter helper to handle retries and timeouts + + Encapsulates timeout behavior for shutdown and export tasks. + + Accepts a callable `export_function` of the form + + def export_function( + payload: object, + timeout_sec: float, + ) -> result: + .... + + that either returns the appropriate export result, or raises a RetryableExportError exception if + the encountered error should be retried. + + Args: + export_function: A callable handling a single export attempt to be used by + export_with_retry() + result: Enum-like type defining SUCCESS and FAILURE values returned by export. + timeout_sec: Timeout for exports in seconds. + """ + + def __init__( + self, + export_function: Callable[[ExportPayloadT, float], ExportResultT], + result: Type[ExportResultT], + timeout_sec: float, + ): + self._export_function = export_function + self._result = result + self._timeout_sec = timeout_sec + + self._shutdown = threading.Event() + self._export_lock = threading.Lock() + + def shutdown(self, timeout_millis: float = 30_000) -> None: + """Shutdown the retrying exporter + + Waits for the current export to finish up to `timeout_millis`. In case the timeout is + reached, the export will be interrupted to to prevent application hanging after completion. + """ + with acquire_timeout(self._export_lock, timeout_millis / 1e3): + self._shutdown.set() + + def export_with_retry( # pylint: disable=too-many-return-statements + self, + payload: ExportPayloadT, + timeout_sec: Optional[float] = None, + ) -> ExportResultT: + """Exports payload with handling of retryable errors + + Calls the export_function provided at initialization with the following signature: + + export_function(payload, timeout_sec=remaining_time) + + where `remaining_time` is updated with each retry. + + Retries will be attempted using exponential backoff. If retry_delay_sec is specified in the + raised error, a retry attempt will not occur before that delay. If a retry after that delay + is not possible, will immediately abort without retrying. + + In case no timeout_sec is not given, the timeout defaults to the timeout given during + initialization. + + Will reattempt the export until timeout has passed, at which point the export will be + abandoned and a failure will be returned. A pending shutdown timing out will also cause + retries to time out. + + Note: Can block longer than timeout if export_function is blocking. Ensure export_function + blocks minimally and does not attempt retries. + + Args: + payload: Data to be exported, which is forwarded to the underlying export + """ + # After the call to shutdown, subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown.is_set(): + _logger.warning("Exporter already shutdown, ignoring batch") + return self._result.FAILURE + + timeout_sec = ( + timeout_sec if timeout_sec is not None else self._timeout_sec + ) + deadline_sec = time() + timeout_sec + + # If negative timeout passed (from e.g. external batch deadline - see GRPC metric exporter) + # fail immediately + if timeout_sec <= 0: + _logger.warning("Export deadline passed, ignoring data") + return self._result.FAILURE + + with acquire_timeout(self._export_lock, timeout_sec) as is_locked: + if not is_locked: + _logger.warning( + "Exporter failed to acquire lock before timeout" + ) + return self._result.FAILURE + + max_value = 64 + # expo returns a generator that yields delay values which grow + # exponentially. Once delay is greater than max_value, the yielded + # value will remain constant. + for delay_sec in _create_exp_backoff_generator( + max_value=max_value + ): + remaining_time_sec = deadline_sec - time() + if remaining_time_sec < 1e-09: + return self._result.FAILURE # Timed out + + if self._shutdown.is_set(): + _logger.warning( + "Export cancelled due to shutdown timing out" + ) + return self._result.FAILURE + + try: + return self._export_function(payload, remaining_time_sec) + except RetryableExportError as exc: + time_remaining_sec = deadline_sec - time() + + delay_sec = ( + exc.retry_delay_sec + if exc.retry_delay_sec is not None + else min(time_remaining_sec, delay_sec) + ) + + if delay_sec > time_remaining_sec: + # We should not exceed the requested timeout + return self._result.FAILURE + + _logger.warning("Retrying in %0.2fs", delay_sec) + self._shutdown.wait(delay_sec) + + return self._result.FAILURE + + +@contextmanager +def acquire_timeout(lock: threading.Lock, timeout: float) -> Iterator[bool]: + result = lock.acquire(timeout=timeout) + try: + yield result + finally: + if result: + lock.release() diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py new file mode 100644 index 00000000000..65e29bedfda --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py @@ -0,0 +1,325 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +import unittest +from itertools import repeat +from logging import WARNING +from typing import Type +from unittest.mock import ANY, Mock, patch + +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, +) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + logger as exporter_logger, +) + +result_type: Type = Mock() + + +class TestRetryableExporter(unittest.TestCase): + def test_export_no_retry(self): + export_func = Mock() + exporter = RetryingExporter(export_func, result_type, timeout_sec=10.0) + with self.subTest("Export success"): + export_func.reset_mock() + export_func.configure_mock(return_value=result_type.SUCCESS) + with self.assertRaises(AssertionError): + with self.assertLogs(level=WARNING): + result = exporter.export_with_retry("payload") + self.assertIs(result, result_type.SUCCESS) + export_func.assert_called_once_with( + "payload", ANY + ) # Timeout checked in the following line + self.assertAlmostEqual( + export_func.call_args_list[0][0][1], 10.0, places=4 + ) + + with self.subTest("Export Fail"): + export_func.reset_mock() + export_func.configure_mock(return_value=result_type.FAILURE) + with self.assertRaises(AssertionError): + with self.assertLogs(exporter_logger, level=WARNING): + result = exporter.export_with_retry("payload") + self.assertIs(result, result_type.FAILURE) + export_func.assert_called_once_with( + "payload", ANY + ) # Timeout checked in the following line + self.assertAlmostEqual( + export_func.call_args_list[0][0][1], 10.0, places=4 + ) + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator", + return_value=repeat(0), + ) + def test_export_retry(self, mock_backoff): + """ + Test we retry until success/failure. + """ + side_effect = [ + RetryableExportError(None), + RetryableExportError(None), + result_type.SUCCESS, + ] + export_func = Mock(side_effect=side_effect) + exporter = RetryingExporter(export_func, result_type, timeout_sec=0.1) + + with self.subTest("Retry until success"): + with patch.object( + exporter._shutdown, "wait" # pylint: disable=protected-access + ) as wait_mock, self.assertLogs(level=WARNING): + result = exporter.export_with_retry("") + self.assertEqual(wait_mock.call_count, len(side_effect) - 1) + self.assertEqual(export_func.call_count, len(side_effect)) + self.assertIs(result, result_type.SUCCESS) + + with self.subTest("Retry until failure"): + export_func.reset_mock() + side_effect.insert(0, RetryableExportError(None)) + side_effect[-1] = result_type.FAILURE + export_func.configure_mock(side_effect=side_effect) + with self.assertLogs(level=WARNING): + result = exporter.export_with_retry("") + self.assertEqual(export_func.call_count, len(side_effect)) + self.assertIs(result, result_type.FAILURE) + + def test_export_uses_arg_timout_when_given(self) -> None: + export_func = Mock(side_effect=RetryableExportError(None)) + exporter = RetryingExporter(export_func, result_type, timeout_sec=2) + with self.assertLogs(level=WARNING): + start = time.time() + exporter.export_with_retry("payload", 0.1) + duration = time.time() - start + self.assertAlmostEqual(duration, 0.1, places=1) + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator", + return_value=repeat(0.25), + ) + def test_export_uses_retry_delay(self, mock_backoff): + """ + Test we retry using the delay specified in the RPC error as a lower bound. + """ + side_effects = [ + RetryableExportError(0.0), + RetryableExportError(0.25), + RetryableExportError(0.75), + RetryableExportError(1.0), + result_type.SUCCESS, + ] + exporter = RetryingExporter( + Mock(side_effect=side_effects), result_type, timeout_sec=10.0 + ) + + with patch.object( + exporter._shutdown, "wait" # pylint: disable=protected-access + ) as wait_mock, self.assertLogs(level=WARNING): + result = exporter.export_with_retry("payload") + self.assertIs(result, result_type.SUCCESS) + self.assertEqual(wait_mock.call_count, len(side_effects) - 1) + self.assertEqual(wait_mock.call_args_list[1].args, (0.25,)) + self.assertEqual(wait_mock.call_args_list[2].args, (0.75,)) + self.assertEqual(wait_mock.call_args_list[3].args, (1.00,)) + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator", + return_value=repeat(0.1), + ) + def test_retry_delay_exceeds_timeout(self, mock_backoff): + """ + Test we timeout if we can't respect retry_delay. + """ + side_effects = [ + RetryableExportError(0.25), + RetryableExportError(1.0), # should timeout here + result_type.SUCCESS, + ] + + mock_export_func = Mock(side_effect=side_effects) + exporter = RetryingExporter( + mock_export_func, + result_type, + timeout_sec=0.5, + ) + + with self.assertLogs(level=WARNING): + self.assertEqual( + exporter.export_with_retry("payload"), result_type.FAILURE + ) + self.assertEqual(mock_export_func.call_count, 2) + + def test_shutdown(self): + """Test we refuse to export if shut down.""" + mock_export_func = Mock(return_value=result_type.SUCCESS) + exporter = RetryingExporter( + mock_export_func, + result_type, + timeout_sec=10.0, + ) + + self.assertEqual( + exporter.export_with_retry("payload"), result_type.SUCCESS + ) + mock_export_func.assert_called_once() + exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export_with_retry("payload"), result_type.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + mock_export_func.assert_called_once() + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator", + return_value=repeat(0.01), + ) + def test_shutdown_wait_last_export(self, mock_backoff): + """Test that shutdown waits for ongoing export to complete.""" + + timeout_sec = 0.05 + + class ExportFunc: + is_exporting = threading.Event() + ready_to_continue = threading.Event() + side_effect = [ + RetryableExportError(None), + RetryableExportError(None), + result_type.SUCCESS, + ] + mock_export_func = Mock(side_effect=side_effect) + + def __call__(self, *args, **kwargs): + self.is_exporting.set() + self.ready_to_continue.wait() + return self.mock_export_func(*args, **kwargs) + + export_func = ExportFunc() + + exporter = RetryingExporter( + export_func, result_type, timeout_sec=timeout_sec + ) + + class ExportWrap: + def __init__(self) -> None: + self.result = None + + def __call__(self, *args, **kwargs): + self.result = exporter.export_with_retry("payload") + return self.result + + export_wrapped = ExportWrap() + + export_thread = threading.Thread( + name="export_thread", target=export_wrapped + ) + with self.assertLogs(level=WARNING): + try: + # Simulate shutdown occurring during retry process + # Intended execution flow + # + # main thread: + # - start export_thread + # - wait for is_exporting + # export_thread: + # - call export_func + # - set is_exporting + # - wait for ready_to_continue + # main thread: + # - start shutdown thread + # - sleep to yield to shutdown thread + # shutdown_thread: + # - block at acquiring lock held by export_thread + # - shutdown is now pending timeout/export completion + # main thread: + # - set ready_to_continue + # - join all threads + export_thread.start() + export_func.is_exporting.wait() + start_time = time.time() + shutdown_thread = threading.Thread( + name="shutdown_thread", target=exporter.shutdown + ) + shutdown_thread.start() + export_func.ready_to_continue.set() + finally: + export_thread.join() + shutdown_thread.join() + + duration = time.time() - start_time + self.assertLessEqual(duration, timeout_sec) + # pylint: disable=protected-access + self.assertTrue(exporter._shutdown) + self.assertIs(export_wrapped.result, result_type.SUCCESS) + + def test_shutdown_timeout_cancels_export_retries(self): + """Test that shutdown timing out cancels ongoing retries.""" + + class ExportFunc: + is_exporting = threading.Event() + ready_to_continue = threading.Event() + mock_export_func = Mock(side_effect=RetryableExportError(None)) + + def __call__(self, *args, **kwargs): + self.is_exporting.set() + self.ready_to_continue.wait() + return self.mock_export_func(*args, **kwargs) + + export_func = ExportFunc() + + exporter = RetryingExporter(export_func, result_type, timeout_sec=30.0) + + class ExportWrap: + def __init__(self) -> None: + self.result = None + + def __call__(self, *args, **kwargs): + self.result = exporter.export_with_retry("payload") + return self.result + + export_wrapped = ExportWrap() + + shutdown_timeout = 0.02 + + export_thread = threading.Thread(target=export_wrapped) + with self.assertLogs(level=WARNING) as warning: + try: + export_thread.start() + export_func.is_exporting.wait() + start_time = time.time() + shutdown_thread = threading.Thread( + target=exporter.shutdown, args=[shutdown_timeout * 1e3] + ) + shutdown_thread.start() + export_func.ready_to_continue.set() + finally: + export_thread.join() + shutdown_thread.join() + duration = time.time() - start_time + self.assertAlmostEqual(duration, shutdown_timeout, places=1) + # pylint: disable=protected-access + self.assertTrue(exporter._shutdown) + self.assertIs(export_wrapped.result, result_type.FAILURE) + print(warning.records) + self.assertEqual(warning.records[0].message, "Retrying in 1.00s") + self.assertEqual( + warning.records[-1].message, + "Export cancelled due to shutdown timing out", + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index d8f2ba2efb6..5e3eafa17c3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -60,7 +60,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): if insecure is None: @@ -81,7 +81,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -109,8 +109,17 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) - def export(self, batch: Sequence[LogData]) -> LogExportResult: - return self._export(batch) + def export( + self, + batch: Sequence[LogData], + timeout_millis: Optional[float] = None, + ) -> LogExportResult: + return self._exporter.export_with_retry( + batch, + timeout_sec=( + timeout_millis / 1000 if timeout_millis is not None else None + ), + ) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index b07d24e0d0e..7729c592e49 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,32 +14,15 @@ """OTLP Exporter""" -import threading from abc import ABC, abstractmethod -from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep -from typing import ( # noqa: F401 - Any, - Callable, - Dict, - Generic, - List, - Optional, - Tuple, - Union, -) +from typing import Any, Callable, Dict, Generic, List, Optional # noqa: F401 from typing import Sequence as TypingSequence -from typing import TypeVar +from typing import Tuple, TypeVar, Union # noqa: F401 from urllib.parse import urlparse from deprecated import deprecated - -from opentelemetry.exporter.otlp.proto.common._internal import ( - _get_resource_data, - _create_exp_backoff_generator, -) from google.rpc.error_details_pb2 import RetryInfo from grpc import ( ChannelCredentials, @@ -51,9 +34,14 @@ ssl_channel_credentials, ) -from opentelemetry.exporter.otlp.proto.grpc import ( - _OTLP_GRPC_HEADERS, +from opentelemetry.exporter.otlp.proto.common._internal import ( + _get_resource_data, +) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) +from opentelemetry.exporter.otlp.proto.grpc import _OTLP_GRPC_HEADERS from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 AnyValue, ArrayValue, @@ -176,7 +164,7 @@ def _get_credentials( class OTLPExporterMixin( ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT] ): - """OTLP span exporter + """OTLP exporter Args: endpoint: OpenTelemetry Collector receiver endpoint @@ -195,7 +183,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): super().__init__() @@ -232,8 +220,8 @@ def __init__( else: self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS) - self._timeout = timeout or int( - environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10) + timeout_sec = timeout or float( + environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10.0) ) self._collector_kwargs = None @@ -260,8 +248,10 @@ def __init__( ) ) - self._export_lock = threading.Lock() self._shutdown = False + self._exporter = RetryingExporter( + self._export, self._result, timeout_sec + ) @abstractmethod def _translate_data( @@ -270,100 +260,76 @@ def _translate_data( pass def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, + data: Union[TypingSequence[ReadableSpan], MetricsData], + timeout_sec: float, ) -> ExportResultT: - # After the call to shutdown, subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - logger.warning("Exporter already shutdown, ignoring batch") - return self._result.FAILURE + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=timeout_sec, + ) - # FIXME remove this check if the export type for traces - # gets updated to a class that represents the proto - # TracesData and use the code below instead. - # logger.warning( - # "Transient error %s encountered while exporting %s, retrying in %ss.", - # error.code(), - # data.__class__.__name__, - # delay, - # ) - max_value = 64 - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in _create_exp_backoff_generator(max_value=max_value): - if delay == max_value or self._shutdown: - return self._result.FAILURE - - with self._export_lock: - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, + return self._result.SUCCESS + + except RpcError as error: + if error.code() in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 ) + else: + delay = None + + logger.warning( + ( + "Transient error %s encountered while exporting " + "%s to %s" + ), + error.code(), + self._exporting, + self._endpoint, + ) + raise RetryableExportError(delay) + + logger.error( + "Failed to export %s to %s, error code: %s", + self._exporting, + self._endpoint, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, + ) - return self._result.SUCCESS - - except RpcError as error: - - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: - - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s to %s, retrying in %ss." - ), - error.code(), - self._exporting, - self._endpoint, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS - - return self._result.FAILURE - - return self._result.FAILURE + if error.code() == StatusCode.OK: + return self._result.SUCCESS + + return self._result.FAILURE + + @abstractmethod + def export(self, data) -> ExportResultT: + pass def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") return - # wait for the last export if any - self._export_lock.acquire(timeout=timeout_millis / 1e3) + self._exporter.shutdown(timeout_millis=timeout_millis) self._shutdown = True - self._export_lock.release() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 645885b6f28..94a46615ce1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import time from dataclasses import replace from logging import getLogger from os import environ @@ -98,7 +99,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict[type, Aggregation] = None, @@ -123,7 +124,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -157,17 +158,26 @@ def _translate_data( def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: - # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC + timeout_sec = ( + timeout_millis / 1000 + if timeout_millis is not None + else self._exporter._timeout_sec # pylint: disable=protected-access + ) + if self._max_export_batch_size is None: - return self._export(data=metrics_data) + return self._exporter.export_with_retry(metrics_data, timeout_sec) export_result = MetricExportResult.SUCCESS + deadline_sec = time() + timeout_sec for split_metrics_data in self._split_metrics_data(metrics_data): - split_export_result = self._export(data=split_metrics_data) + time_remaining_sec = deadline_sec - time() + split_export_result = self._exporter.export_with_retry( + split_metrics_data, timeout_sec=time_remaining_sec + ) if split_export_result is MetricExportResult.FAILURE: export_result = MetricExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index ce8dcaabdf7..da00502b7d5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -15,21 +15,17 @@ import logging from os import environ -from typing import Dict, Optional, Sequence, Tuple, Union -from typing import Sequence as TypingSequence - +from typing import Dict, Optional +from typing import Sequence +from typing import Tuple, Union from grpc import ChannelCredentials, Compression -from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( - encode_spans, -) -from opentelemetry.exporter.otlp.proto.grpc.exporter import ( +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 OTLPExporterMixin, _get_credentials, environ_to_compression, -) -from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 get_resource_data, ) from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( @@ -41,12 +37,14 @@ from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 InstrumentationScope, ) -from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 - ScopeSpans, +from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401 +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F40 ResourceSpans, + ScopeSpans, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 Span as CollectorSpan, ) -from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY, @@ -91,12 +89,11 @@ def __init__( insecure: Optional[bool] = None, credentials: Optional[ChannelCredentials] = None, headers: Optional[ - Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] + Union[Sequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): - if insecure is None: insecure = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE) if insecure is not None: @@ -115,7 +112,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -142,8 +139,17 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - return self._export(spans) + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: Optional[float] = None, + ) -> SpanExportResult: + return self._exporter.export_with_retry( + spans, + timeout_sec=( + timeout_millis / 1000 if timeout_millis is not None else None + ), + ) def shutdown(self) -> None: OTLPExporterMixin.shutdown(self) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index fc2211c5aeb..9090019fc0b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -29,6 +29,9 @@ from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.common._internal import _encode_value +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( OTLPLogExporter, ) @@ -389,36 +392,57 @@ def test_otlp_headers_from_env(self): ) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._export", + side_effect=RetryableExportError(None), + ) + def test_export_uses_arg_timeout_when_given(self, export_mock) -> None: + exporter = OTLPLogExporter(timeout=20) + + with self.assertLogs(level="WARNING"): + start = time.time() + exporter.export([self.log_data_1], timeout_millis=100.0) + duration = time.time() - start + self.assertAlmostEqual(duration, 0.1, places=1) + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): + def test_unavailable(self, mock_expo): mock_expo.configure_mock(**{"return_value": [0.01]}) add_LogsServiceServicer_to_server( LogsServiceServicerUNAVAILABLE(), self.server ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export([self.log_data_1]), + LogExportResult.FAILURE, + ) + wait_mock.assert_called_with(0.01) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): + def test_unavailable_delay(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) + mock_expo.configure_mock(**{"return_value": [0.01]}) add_LogsServiceServicer_to_server( LogsServiceServicerUNAVAILABLEDelay(), self.server ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export([self.log_data_1]), + LogExportResult.FAILURE, + ) + wait_mock.assert_called_with(0.01) def test_success(self): add_LogsServiceServicer_to_server( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index f7bbdabb11f..faec76677d7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -13,6 +13,7 @@ # limitations under the License. import threading +import time from logging import WARNING from time import time_ns from types import MethodType @@ -63,7 +64,7 @@ def test_environ_to_compression(self): environ_to_compression("test_invalid") @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) def test_export_warning(self, mock_expo): mock_expo.configure_mock(**{"return_value": [0]}) @@ -86,6 +87,9 @@ def _translate_data( ) -> ExportServiceRequestT: pass + def export(self, data): + return self._exporter.export_with_retry(data) + @property def _exporting(self) -> str: return "mock" @@ -93,8 +97,7 @@ def _exporting(self) -> str: otlp_mock_exporter = OTLPMockExporter() with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export(Mock()) + otlp_mock_exporter.export(Mock()) self.assertEqual( warning.records[0].message, "Failed to export mock to localhost:4317, error code: None", @@ -110,15 +113,15 @@ def trailing_metadata(self): rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export([]) + otlp_mock_exporter.export([]) self.assertEqual( warning.records[0].message, ( "Transient error StatusCode.CANCELLED encountered " - "while exporting mock to localhost:4317, retrying in 0s." + "while exporting mock to localhost:4317" ), ) + self.assertEqual(warning.records[1].message, "Retrying in 0.00s") def test_shutdown(self): result_mock = Mock() @@ -132,6 +135,9 @@ def _translate_data( ) -> ExportServiceRequestT: pass + def export(self, data): + return self._exporter.export_with_retry(data) + @property def _exporting(self) -> str: return "mock" @@ -139,21 +145,21 @@ def _exporting(self) -> str: otlp_mock_exporter = OTLPMockExporter() with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.SUCCESS + otlp_mock_exporter.export(data={}), result_mock.SUCCESS ) otlp_mock_exporter.shutdown() - # pylint: disable=protected-access self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.FAILURE + otlp_mock_exporter.export(data={}), result_mock.FAILURE ) self.assertEqual( warning.records[0].message, "Exporter already shutdown, ignoring batch", ) - def test_shutdown_wait_last_export(self): + def test_shutdown_wait_for_last_export_finishing_within_shutdown_timeout( + self, + ): result_mock = Mock() rpc_error = RpcError() @@ -181,28 +187,110 @@ def _translate_data( ) -> ExportServiceRequestT: pass + def export(self, data): + return self._exporter.export_with_retry(data) + @property def _exporting(self) -> str: return "mock" - otlp_mock_exporter = OTLPMockExporter() + otlp_mock_exporter = OTLPMockExporter(timeout=0.05) # pylint: disable=protected-access export_thread = threading.Thread( - target=otlp_mock_exporter._export, args=({},) + target=otlp_mock_exporter.export, args=({},) ) export_thread.start() try: # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._export_lock.locked()) - # delay is 1 second while the default shutdown timeout is 30_000 milliseconds + + # Wait for the export thread to hold the lock. Since the main thread is not synchronized + # with the export thread, the thread may not be ready yet. + for _ in range(5): + if otlp_mock_exporter._exporter._export_lock.locked(): + break + time.sleep(0.01) + self.assertTrue(otlp_mock_exporter._exporter._export_lock.locked()) + + # 6 retries with a fixed retry delay of 10ms (see TraceServiceServicerUNAVAILABLEDelay) + # The default shutdown timeout is 30 seconds + # This means the retries will finish long before the shutdown flag will be set start_time = time_ns() otlp_mock_exporter.shutdown() now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access + # Verify that the shutdown method finished within the shutdown timeout + self.assertLessEqual(now, (start_time + 3e10)) self.assertTrue(otlp_mock_exporter._shutdown) + self.assertFalse( + otlp_mock_exporter._exporter._export_lock.locked() + ) + finally: + export_thread.join() + + def test_shutdown_wait_for_last_export_not_finishing_within_shutdown_timeout( + self, + ): + result_mock = Mock() + rpc_error = RpcError() + + def code(self): + return StatusCode.UNAVAILABLE + + def trailing_metadata(self): + return { + "google.rpc.retryinfo-bin": RetryInfo( + retry_delay=Duration(nanos=int(1e7)) + ).SerializeToString() + } + + rpc_error.code = MethodType(code, rpc_error) + rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) + + class OTLPMockExporter(OTLPExporterMixin): + _result = result_mock + _stub = Mock( + **{"return_value": Mock(**{"Export.side_effect": rpc_error})} + ) + + def _translate_data( + self, data: Sequence[SDKDataT] + ) -> ExportServiceRequestT: + pass + + def export(self, data): + return self._exporter.export_with_retry(data) + + @property + def _exporting(self) -> str: + return "mock" + + otlp_mock_exporter = OTLPMockExporter() + + # pylint: disable=protected-access + export_thread = threading.Thread( + target=otlp_mock_exporter.export, args=({},) + ) + export_thread.start() + try: # pylint: disable=protected-access - self.assertFalse(otlp_mock_exporter._export_lock.locked()) + + # Wait for the export thread to hold the lock. Since the main thread is not synchronized + # with the export thread, the thread may not be ready yet. + for _ in range(5): + if otlp_mock_exporter._exporter._export_lock.locked(): + break + time.sleep(0.01) + self.assertTrue(otlp_mock_exporter._exporter._export_lock.locked()) + + # 6 retries with a fixed retry delay of 10ms (see TraceServiceServicerUNAVAILABLEDelay) + # The default shutdown timeout is 30 seconds + # This means the retries will finish long before the shutdown flag will be set + start_time = time_ns() + otlp_mock_exporter.shutdown(0.001) + now = time_ns() + # Verify that the shutdown method finished within the shutdown timeout + self.assertLessEqual(now, (start_time + 3e10)) + self.assertTrue(otlp_mock_exporter._shutdown) finally: export_thread.join() + self.assertFalse(otlp_mock_exporter._exporter._export_lock.locked()) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index f9f9427b776..3758ca4bc55 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -15,6 +15,7 @@ # pylint: disable=too-many-lines import threading +import time from concurrent.futures import ThreadPoolExecutor # pylint: disable=too-many-lines @@ -32,6 +33,9 @@ from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter, ) @@ -147,7 +151,7 @@ class TestOTLPMetricExporter(TestCase): def setUp(self): - self.exporter = OTLPMetricExporter() + self.exporter = OTLPMetricExporter(timeout=0.05) self.server = server(ThreadPoolExecutor(max_workers=10)) @@ -443,7 +447,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): # pylint: disable=no-self-use @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) @@ -481,56 +485,66 @@ def test_otlp_exporter_otlp_compression_unspecified( ) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): + def test_unavailable(self, mock_expo): mock_expo.configure_mock(**{"return_value": [0.01]}) add_MetricsServiceServicer_to_server( MetricsServiceServicerUNAVAILABLE(), self.server ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(0.01) + + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + wait_mock.assert_called_with(0.01) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): + def test_unavailable_delay(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) + mock_expo.configure_mock(**{"return_value": [0.01]}) add_MetricsServiceServicer_to_server( MetricsServiceServicerUNAVAILABLEDelay(), self.server ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(0.01) + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + wait_mock.assert_called_with(0.01) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.logger.error") - def test_unknown_logs(self, mock_logger_error, mock_sleep, mock_expo): + def test_unknown_logs(self, mock_logger_error, mock_expo): mock_expo.configure_mock(**{"return_value": [1]}) add_MetricsServiceServicer_to_server( MetricsServiceServicerUNKNOWN(), self.server ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_not_called() + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + wait_mock.assert_not_called() mock_logger_error.assert_called_with( "Failed to export %s to %s, error code: %s", "metrics", @@ -827,6 +841,96 @@ def test_split_metrics_data_many_resources_scopes_metrics(self): split_metrics_data, ) + @patch( + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter.OTLPMetricExporter._export", + side_effect=RetryableExportError(None), + ) + def test_split_metrics_timeout(self, mock_export): + """ + Test that given a batch that will be split, timeout is respected across + the batch as a whole. + """ + metrics_data = MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + self.assertEqual(len(split_metrics_data), 2) + exporter = OTLPMetricExporter(max_export_batch_size=2) + + timeout_s = 0.5 + # The first export should block the full timeout duration and succeed. + # The subsequent export should fail immediately as the timeout will + # have passed. + with self.assertLogs(level="WARNING") as warning: + self.assertIs( + exporter.export(metrics_data, timeout_s * 1e3), + MetricExportResult.FAILURE, + ) + # There could be multiple calls to export because of the jitter in backoff + self.assertNotIn( + split_metrics_data[1], + [call_args[1] for call_args in mock_export.call_args_list], + ) + self.assertEqual( + warning.records[-1].message, + "Export deadline passed, ignoring data", + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) @@ -852,7 +956,9 @@ def test_shutdown(self): ) self.exporter = OTLPMetricExporter() - def test_shutdown_wait_last_export(self): + def test_shutdown_wait_for_last_export_finishing_within_shutdown_timeout( + self, + ): add_MetricsServiceServicer_to_server( MetricsServiceServicerUNAVAILABLEDelay(), self.server ) @@ -863,16 +969,25 @@ def test_shutdown_wait_last_export(self): export_thread.start() try: # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + + # Wait for the export thread to hold the lock. Since the main thread is not synchronized + # with the export thread, the thread may not be ready yet. + for _ in range(5): + if self.exporter._exporter._export_lock.locked(): + break + time.sleep(0.01) + self.assertTrue(self.exporter._exporter._export_lock.locked()) + + # 6 retries with a fixed retry delay of 10ms (see TraceServiceServicerUNAVAILABLEDelay) + # The default shutdown timeout is 30 seconds + # This means the retries will finish long before the shutdown flag will be set start_time = time_ns() self.exporter.shutdown() now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access + # Verify that the shutdown method finished within the shutdown timeout + self.assertLessEqual(now, (start_time + 3e10)) self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) + self.assertFalse(self.exporter._exporter._export_lock.locked()) finally: export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index d618ffb13a3..0f27eb5ea03 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -16,6 +16,7 @@ import os import threading +import time from concurrent.futures import ThreadPoolExecutor from logging import WARNING from time import time_ns @@ -32,6 +33,9 @@ from opentelemetry.exporter.otlp.proto.common._internal import ( _encode_key_value, ) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter, ) @@ -133,7 +137,7 @@ class TestOTLPSpanExporter(TestCase): def setUp(self): tracer_provider = TracerProvider() - self.exporter = OTLPSpanExporter(insecure=True) + self.exporter = OTLPSpanExporter(insecure=True, timeout=0.05) tracer_provider.add_span_processor(SimpleSpanProcessor(self.exporter)) self.tracer = tracer_provider.get_tracer(__name__) @@ -522,35 +526,54 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure): ) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter._export", + side_effect=RetryableExportError(None), + ) + def test_export_uses_arg_timeout_when_given(self, export_mock) -> None: + exporter = OTLPSpanExporter(timeout=20) + + with self.assertLogs(level="WARNING"): + start = time.time() + exporter.export([self.span], timeout_millis=100.0) + duration = time.time() - start + self.assertAlmostEqual(duration, 0.1, places=1) + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): + def test_unavailable(self, mock_expo): mock_expo.configure_mock(**{"return_value": [0.01]}) add_TraceServiceServicer_to_server( TraceServiceServicerUNAVAILABLE(), self.server ) - result = self.exporter.export([self.span]) + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + result = self.exporter.export([self.span]) self.assertEqual(result, SpanExportResult.FAILURE) - mock_sleep.assert_called_with(0.01) + wait_mock.assert_called_with(0.01) @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator" ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): + def test_unavailable_delay(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [1]}) + mock_expo.configure_mock(**{"return_value": [0.01]}) add_TraceServiceServicer_to_server( TraceServiceServicerUNAVAILABLEDelay(), self.server ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE - ) - mock_sleep.assert_called_with(0.01) + with patch.object( + self.exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.FAILURE + ) + wait_mock.assert_called_with(0.01) def test_success(self): add_TraceServiceServicer_to_server( @@ -988,7 +1011,9 @@ def test_shutdown(self): "Exporter already shutdown, ignoring batch", ) - def test_shutdown_wait_last_export(self): + def test_shutdown_wait_for_last_export_finishing_within_shutdown_timeout( + self, + ): add_TraceServiceServicer_to_server( TraceServiceServicerUNAVAILABLEDelay(), self.server ) @@ -999,16 +1024,25 @@ def test_shutdown_wait_last_export(self): export_thread.start() try: # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + + # Wait for the export thread to hold the lock. Since the main thread is not synchronized + # with the export thread, the thread may not be ready yet. + for _ in range(5): + if self.exporter._exporter._export_lock.locked(): + break + time.sleep(0.01) + self.assertTrue(self.exporter._exporter._export_lock.locked()) + + # 6 retries with a fixed retry delay of 10ms (see TraceServiceServicerUNAVAILABLEDelay) + # The default shutdown timeout is 30 seconds + # This means the retries will finish long before the shutdown flag will be set start_time = time_ns() self.exporter.shutdown() now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access + # Verify that the shutdown method finished within the shutdown timeout + self.assertLessEqual(now, (start_time + 3e10)) self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) + self.assertFalse(self.exporter._exporter._export_lock.locked()) finally: export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 597b012a49a..3b772351671 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -18,12 +18,12 @@ from io import BytesIO from os import environ from typing import Dict, Optional, Sequence -from time import sleep import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.sdk.environment_variables import ( @@ -74,7 +74,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -109,7 +109,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + timeout_sec = timeout or float( environ.get( OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -124,8 +124,13 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + self._exporter = RetryingExporter( + self._export, LogExportResult, timeout_sec + ) - def _export(self, serialized_data: bytes): + def _export( + self, serialized_data: bytes, timeout_sec: float + ) -> LogExportResult: data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -135,14 +140,31 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) + if resp.ok: + return LogExportResult.SUCCESS + + if self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting logs batch.", + resp.reason, + ) + raise RetryableExportError(None) + + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return LogExportResult.FAILURE + @staticmethod def _retryable(resp: requests.Response) -> bool: if resp.status_code == 408: @@ -151,7 +173,11 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def export(self, batch: Sequence[LogData]) -> LogExportResult: + def export( + self, + batch: Sequence[LogData], + timeout_millis: Optional[float] = None, + ) -> LogExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: @@ -159,34 +185,12 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return LogExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return - if resp.ok: - return LogExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return LogExportResult.FAILURE - return LogExportResult.FAILURE + return self._exporter.export_with_retry( + serialized_data, + timeout_sec=( + timeout_millis / 1000.0 if timeout_millis is not None else None + ), + ) def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 16ac042dd89..260df9570e6 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -19,12 +19,14 @@ from typing import Sequence, Mapping # noqa: F401 from io import BytesIO -from time import sleep from deprecated import deprecated from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, - _create_exp_backoff_generator, +) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( OTLPMetricExporterMixin, @@ -103,7 +105,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, preferred_temporality: Dict[type, AggregationTemporality] = None, @@ -139,7 +141,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -160,7 +162,14 @@ def __init__( preferred_temporality, preferred_aggregation ) - def _export(self, serialized_data: bytes): + self._shutdown = False + self._exporter = RetryingExporter( + self._export, MetricExportResult, self._timeout + ) + + def _export( + self, serialized_data: bytes, timeout_sec: float + ) -> MetricExportResult: data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -170,14 +179,32 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) + if resp.ok: + return MetricExportResult.SUCCESS + + if self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch", + resp.reason, + ) + + raise RetryableExportError(None) + + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + @staticmethod def _retryable(resp: requests.Response) -> bool: if resp.status_code == 408: @@ -189,40 +216,28 @@ def _retryable(resp: requests.Response) -> bool: def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: - serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return - if resp.ok: - return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return MetricExportResult.FAILURE - return MetricExportResult.FAILURE + serialized_data = encode_metrics(metrics_data) + return self._exporter.export_with_retry( + serialized_data.SerializeToString(), + timeout_sec=( + timeout_millis / 1000 if timeout_millis is not None else None + ), + ) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._exporter.shutdown(timeout_millis=timeout_millis) + self._session.close() + self._shutdown = True @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index ac69b6acde3..65863fe7c21 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -17,13 +17,13 @@ import zlib from io import BytesIO from os import environ -from typing import Dict, Optional -from time import sleep +from typing import Dict, Optional, Sequence import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, @@ -44,6 +44,7 @@ OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -72,7 +73,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -106,7 +107,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + timeout_sec = timeout or float( environ.get( OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -121,8 +122,13 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + self._exporter = RetryingExporter( + self._export, SpanExportResult, timeout_sec + ) - def _export(self, serialized_data: bytes): + def _export( + self, serialized_data: bytes, timeout_sec: float + ) -> SpanExportResult: data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -132,14 +138,31 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) + if resp.ok: + return SpanExportResult.SUCCESS + + if self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting span batch", + resp.reason, + ) + raise RetryableExportError(None) + + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return SpanExportResult.FAILURE + @staticmethod def _retryable(resp: requests.Response) -> bool: if resp.status_code == 408: @@ -148,38 +171,14 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def _serialize_spans(self, spans): + def _serialize_spans(self, spans: Sequence[ReadableSpan]) -> str: return encode_spans(spans).SerializePartialToString() - def _export_serialized_spans(self, serialized_data): - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return SpanExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return - if resp.ok: - return SpanExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return SpanExportResult.FAILURE - return SpanExportResult.FAILURE - - def export(self, spans) -> SpanExportResult: + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: Optional[float] = None, + ) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: @@ -187,8 +186,12 @@ def export(self, spans) -> SpanExportResult: return SpanExportResult.FAILURE serialized_data = self._serialize_spans(spans) - - return self._export_serialized_spans(serialized_data) + return self._exporter.export_with_retry( + serialized_data, + timeout_sec=( + timeout_millis / 1000.0 if timeout_millis is not None else None + ), + ) def shutdown(self): if self._shutdown: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index ac039d4ff81..3b409159cd5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -15,11 +15,12 @@ from logging import WARNING from os import environ from unittest import TestCase -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, call, patch +import responses from requests import Session from requests.models import Response -from responses import POST, activate, add +from responses.registries import OrderedRegistry from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, @@ -324,30 +325,37 @@ def test_serialization(self, mock_post): url=exporter._endpoint, data=serialized_data.SerializeToString(), verify=exporter._certificate_file, - timeout=exporter._timeout, + timeout=ANY, # Is checked in the following line cert=exporter._client_cert, ) - - @activate - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - add( - POST, - "http://metrics.example.com/export", - json={"error": "something exploded"}, - status=500, + self.assertAlmostEqual( + mock_post.call_args_list[0][1]["timeout"], 10.0, places=4 ) + # Pylint is wrong about this + @responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + registry=OrderedRegistry + ) + def test_exponential_backoff(self): + for status in [500, 500, 500, 200]: + responses.add( + responses.POST, + "http://metrics.example.com/export", + json={"error": "something exploded"}, + status=status, + ) + exporter = OTLPMetricExporter( endpoint="http://metrics.example.com/export" ) metrics_data = self.metrics["sum_int"] - exporter.export(metrics_data) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + exporter.export(metrics_data) + wait_mock.assert_has_calls([call(1), call(2), call(4)]) def test_aggregation_temporality(self): @@ -501,14 +509,15 @@ def test_exponential_explicit_bucket_histogram(self): ExplicitBucketHistogramAggregation, ) - @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ self.assertEqual( - OTLPMetricExporter().export(MagicMock()), + OTLPMetricExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ).export(MagicMock()), MetricExportResult.SUCCESS, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index f5606794620..dc698a4c0ad 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -15,14 +15,19 @@ # pylint: disable=protected-access import unittest +from time import time from typing import List from unittest.mock import MagicMock, Mock, call, patch import requests import responses from google.protobuf.json_format import MessageToDict +from responses.registries import OrderedRegistry from opentelemetry._logs import SeverityNumber +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http._log_exporter import ( DEFAULT_COMPRESSION, @@ -77,7 +82,7 @@ def test_constructor_default(self): self.assertEqual(exporter._certificate_file, True) self.assertEqual(exporter._client_certificate_file, None) self.assertEqual(exporter._client_key_file, None) - self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) + self.assertEqual(exporter._exporter._timeout_sec, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) self.assertIsInstance(exporter._session, requests.Session) @@ -119,7 +124,7 @@ def test_exporter_metrics_env_take_priority(self): exporter._client_certificate_file, "logs/client-cert.pem" ) self.assertEqual(exporter._client_key_file, "logs/client-key.pem") - self.assertEqual(exporter._timeout, 40) + self.assertEqual(exporter._exporter._timeout_sec, 40) self.assertIs(exporter._compression, Compression.Deflate) self.assertEqual( exporter._headers, @@ -160,7 +165,7 @@ def test_exporter_constructor_take_priority(self): self.assertEqual(exporter._certificate_file, "/hello.crt") self.assertEqual(exporter._client_certificate_file, "/client-cert.pem") self.assertEqual(exporter._client_key_file, "/client-key.pem") - self.assertEqual(exporter._timeout, 70) + self.assertEqual(exporter._exporter._timeout_sec, 70) self.assertIs(exporter._compression, Compression.NoCompression) self.assertEqual( exporter._headers, @@ -192,13 +197,28 @@ def test_exporter_env(self): exporter._client_certificate_file, ENV_CLIENT_CERTIFICATE ) self.assertEqual(exporter._client_key_file, ENV_CLIENT_KEY) - self.assertEqual(exporter._timeout, int(ENV_TIMEOUT)) + self.assertEqual(exporter._exporter._timeout_sec, int(ENV_TIMEOUT)) self.assertIs(exporter._compression, Compression.Gzip) self.assertEqual( exporter._headers, {"envheader1": "val1", "envheader2": "val2"} ) self.assertIsInstance(exporter._session, requests.Session) + @patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter._export", + side_effect=RetryableExportError(None), + ) + def test_export_uses_arg_timeout_when_given(self, export_mock) -> None: + exporter = OTLPLogExporter( + endpoint="http://traces.example.com/export", timeout=20 + ) + + with self.assertLogs(level="WARNING"): + start = time() + exporter.export(self._get_sdk_log_data(), timeout_millis=100.0) + duration = time() - start + self.assertAlmostEqual(duration, 0.1, places=1) + @staticmethod def export_log_and_deserialize(log): with patch("requests.Session.post") as mock_post: @@ -269,24 +289,28 @@ def test_exported_log_without_span_id(self): else: self.fail("No log records found") - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://logs.example.com/export", - json={"error": "something exploded"}, - status=500, - ) + # Pylint is wrong about this + @responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + registry=OrderedRegistry + ) + def test_exponential_backoff(self): + for status in [500, 500, 500, 200]: + responses.add( + responses.POST, + "http://logs.example.com/export", + json={"error": "something exploded"}, + status=status, + ) exporter = OTLPLogExporter(endpoint="http://logs.example.com/export") logs = self._get_sdk_log_data() - exporter.export(logs) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown, # pylint: disable=protected-access + "wait", + ) as wait_mock: + exporter.export(logs) + wait_mock.assert_has_calls([call(1), call(2), call(4)]) @staticmethod def _get_sdk_log_data() -> List[LogData]: @@ -358,12 +382,14 @@ def _get_sdk_log_data() -> List[LogData]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ self.assertEqual( - OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS + OTLPLogExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ).export(MagicMock()), + LogExportResult.SUCCESS, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 9d57c62bae8..1109d5647ae 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -13,11 +13,16 @@ # limitations under the License. import unittest +from time import time from unittest.mock import MagicMock, Mock, call, patch import requests import responses +from responses.registries import OrderedRegistry +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( DEFAULT_COMPRESSION, @@ -66,7 +71,7 @@ def test_constructor_default(self): self.assertEqual(exporter._certificate_file, True) self.assertEqual(exporter._client_certificate_file, None) self.assertEqual(exporter._client_key_file, None) - self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) + self.assertEqual(exporter._exporter._timeout_sec, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) self.assertIsInstance(exporter._session, requests.Session) @@ -108,7 +113,7 @@ def test_exporter_traces_env_take_priority(self): exporter._client_certificate_file, "traces/client-cert.pem" ) self.assertEqual(exporter._client_key_file, "traces/client-key.pem") - self.assertEqual(exporter._timeout, 40) + self.assertEqual(exporter._exporter._timeout_sec, 40) self.assertIs(exporter._compression, Compression.Deflate) self.assertEqual( exporter._headers, @@ -151,7 +156,7 @@ def test_exporter_constructor_take_priority(self): exporter._client_certificate_file, "path/to/client-cert.pem" ) self.assertEqual(exporter._client_key_file, "path/to/client-key.pem") - self.assertEqual(exporter._timeout, 20) + self.assertEqual(exporter._exporter._timeout_sec, 20) self.assertIs(exporter._compression, Compression.NoCompression) self.assertEqual( exporter._headers, @@ -179,7 +184,7 @@ def test_exporter_env(self): exporter._client_certificate_file, OS_ENV_CLIENT_CERTIFICATE ) self.assertEqual(exporter._client_key_file, OS_ENV_CLIENT_KEY) - self.assertEqual(exporter._timeout, int(OS_ENV_TIMEOUT)) + self.assertEqual(exporter._exporter._timeout_sec, int(OS_ENV_TIMEOUT)) self.assertIs(exporter._compression, Compression.Gzip) self.assertEqual( exporter._headers, {"envheader1": "val1", "envheader2": "val2"} @@ -232,18 +237,44 @@ def test_headers_parse_from_env(self): ), ) - # pylint: disable=no-self-use - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://traces.example.com/export", - json={"error": "something exploded"}, - status=500, + @patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter._export", + side_effect=RetryableExportError(None), + ) + def test_export_uses_arg_timeout_when_given(self, export_mock) -> None: + exporter = OTLPSpanExporter( + endpoint="http://traces.example.com/export", timeout=20 + ) + + span = _Span( + "abc", + context=Mock( + trace_state={"a": "b", "c": "d"}, + span_id=10217189687419569865, + trace_id=67545097771067222548457157018666467027, + ), ) + with self.assertLogs(level="WARNING"): + start = time() + exporter.export([span], timeout_millis=100.0) + duration = time() - start + self.assertAlmostEqual(duration, 0.1, places=1) + + # pylint: disable=no-self-use + # Pylint is wrong about this + @responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + registry=OrderedRegistry + ) + def test_exponential_backoff(self): + for status in [500, 500, 500, 200]: + responses.add( + responses.POST, + "http://traces.example.com/export", + json={"error": "something exploded"}, + status=status, + ) + exporter = OTLPSpanExporter( endpoint="http://traces.example.com/export" ) @@ -258,17 +289,21 @@ def test_exponential_backoff(self, mock_sleep): ), ) - exporter.export([span]) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown, + "wait", # pylint: disable=protected-access + ) as wait_mock: + exporter.export([span]) + wait_mock.assert_has_calls([call(1), call(2), call(4)]) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ self.assertEqual( - OTLPSpanExporter().export(MagicMock()), SpanExportResult.SUCCESS + OTLPSpanExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ).export(MagicMock()), + SpanExportResult.SUCCESS, )