Skip to content

Commit

Permalink
[Exporter] Support redirect response in exporter (#20489)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored and iscai-msft committed Sep 29, 2021
1 parent 3beda8f commit 61bd48b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 21 deletions.
4 changes: 4 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release History

**Features**
- Support stamp specific redirect in exporters
([#20489](https://github.com/Azure/azure-sdk-for-python/pull/20489))

**Breaking Changes**
- Change exporter OT to AI mapping fields following common schema
([#20445](https://github.com/Azure/azure-sdk-for-python/pull/20445))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import tempfile
from enum import Enum
from typing import List, Any
from urllib.parse import urlparse

from opentelemetry.sdk.trace.export import SpanExportResult

from azure.core.exceptions import HttpResponseError, ServiceRequestError
from azure.core.pipeline.policies import ContentDecodePolicy, HttpLoggingPolicy, RequestIdPolicy
from azure.core.pipeline.policies import ContentDecodePolicy, HttpLoggingPolicy, RedirectPolicy, RequestIdPolicy
from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
from azure.monitor.opentelemetry.exporter._generated._configuration import AzureMonitorClientConfiguration
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(self, **kwargs: Any) -> None:
self._instrumentation_key = parsed_connection_string.instrumentation_key
self._timeout = 10.0 # networking timeout in seconds
self._api_version = kwargs.get('api_version') or _SERVICE_API_LATEST
self._consecutive_redirects = 0 # To prevent circular redirects

temp_suffix = self._instrumentation_key or ""
default_storage_path = os.path.join(
Expand All @@ -56,7 +58,8 @@ def __init__(self, **kwargs: Any) -> None:
config.user_agent_policy,
config.proxy_policy,
ContentDecodePolicy(**kwargs),
config.redirect_policy,
# Handle redirects in exporter, set new endpoint if redirected
RedirectPolicy(permit_redirects=False),
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy,
Expand Down Expand Up @@ -100,6 +103,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
try:
track_response = self.client.track(envelopes)
if not track_response.errors:
self._consecutive_redirects = 0
logger.info("Transmission succeeded: Item received: %s. Items accepted: %s",
track_response.items_received, track_response.items_accepted)
return ExportResult.SUCCESS
Expand All @@ -120,11 +124,33 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
envelopes_to_store = [x.as_dict()
for x in resend_envelopes]
self.storage.put(envelopes_to_store)
self._consecutive_redirects = 0
return ExportResult.FAILED_RETRYABLE

except HttpResponseError as response_error:
if _is_retryable_code(response_error.status_code):
return ExportResult.FAILED_RETRYABLE
if _is_redirect_code(response_error.status_code):
self._consecutive_redirects = self._consecutive_redirects + 1
if self._consecutive_redirects < self.client._config.redirect_policy.max_redirects: # pylint: disable=W0212
if response_error.response and response_error.response.headers:
location = response_error.response.headers.get("location")
if location:
url = urlparse(location)
if url.scheme and url.netloc:
# Change the host to the new redirected host
self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212
# Attempt to export again
return self._transmit(envelopes)
logger.error(
"Error parsing redirect information."
)
return ExportResult.FAILED_NOT_RETRYABLE
logger.error(
"Error sending telemetry because of circular redirects." \
"Please check the integrity of your connection string."
)
return ExportResult.FAILED_NOT_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
except ServiceRequestError as request_error:
# Errors when we're fairly sure that the server did not receive the
Expand All @@ -140,9 +166,20 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
return ExportResult.FAILED_NOT_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
# No spans to export
self._consecutive_redirects = 0
return ExportResult.SUCCESS


def _is_redirect_code(response_code: int) -> bool:
"""
Determine if response is a redirect response.
"""
return bool(response_code in(
307, # Temporary redirect
308, # Permanent redirect
))


def _is_retryable_code(response_code: int) -> bool:
"""
Determine if response is retryable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from opentelemetry.sdk.trace.export import SpanExportResult

from azure.core.exceptions import HttpResponseError, ServiceRequestError
from azure.core.pipeline.transport import HttpResponse
from azure.monitor.opentelemetry.exporter.export._base import (
BaseExporter,
ExportResult,
get_trace_export_result,
)
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem, TrackResponse


def throw(exc_type, *args, **kwargs):
Expand Down Expand Up @@ -115,44 +117,54 @@ def test_transmit_from_storage_lease_failure(self, requests_mock):
self._base._transmit_from_storage()
self.assertTrue(self._base.storage.get())

def test_transmit_request_timeout(self):
with mock.patch("requests.Session.request", throw(requests.Timeout)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_http_error_retryable(self):
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._is_retryable_code") as m:
m.return_value = True
with mock.patch("requests.Session.request", throw(HttpResponseError)):
with mock.patch.object(AzureMonitorClient, 'track', throw(HttpResponseError)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_http_error_retryable(self):
def test_transmit_http_error_not_retryable(self):
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._is_retryable_code") as m:
m.return_value = False
with mock.patch("requests.Session.request", throw(HttpResponseError)):
with mock.patch.object(AzureMonitorClient, 'track', throw(HttpResponseError)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)

def test_transmit_http_error_redirect(self):
response = HttpResponse(None, None)
response.status_code = 307
response.headers = {"location":"https://example.com"}
prev_redirects = self._base.client._config.redirect_policy.max_redirects
self._base.client._config.redirect_policy.max_redirects = 2
prev_host = self._base.client._config.host
error = HttpResponseError(response=response)
with mock.patch.object(AzureMonitorClient, 'track') as post:
post.side_effect = error
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertEqual(post.call_count, 2)
self.assertEqual(self._base.client._config.host, "https://example.com")
self._base.client._config.redirect_policy.max_redirects = prev_redirects
self._base.client._config.host = prev_host

def test_transmit_request_error(self):
with mock.patch("requests.Session.request", throw(ServiceRequestError, message="error")):
with mock.patch.object(AzureMonitorClient, 'track', throw(ServiceRequestError, message="error")):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_request_exception(self):
with mock.patch("requests.Session.request", throw(Exception)):
with mock.patch.object(AzureMonitorClient, 'track', throw(Exception)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)

def test_transmission_200(self):
with mock.patch("requests.Session.request") as post:
post.return_value = MockResponse(200, json.dumps(
{
"itemsReceived": 1,
"itemsAccepted": 1,
"errors": [],
}
), reason="OK", content="")
with mock.patch.object(AzureMonitorClient, 'track') as post:
post.return_value = TrackResponse(
items_received=1,
items_accepted=1,
errors=[],
)
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.SUCCESS)

Expand Down

0 comments on commit 61bd48b

Please sign in to comment.