From fdc2bd78e73e5e6427b7475495a1f8338ac4ac52 Mon Sep 17 00:00:00 2001 From: Rajkumar Rangaraj Date: Fri, 16 Oct 2020 22:20:41 -0700 Subject: [PATCH] Azure Monitor Exporter - Add IngestionResponsePolicy (#16032) * Added IngestionResponsePolicy * check for object is int * Added constant to response code * Added method summary. --- .../src/ApplicationInsightsRestClient.cs | 64 ++++++---- .../src/AzureMonitorTransmitter.cs | 22 ++-- .../src/IngestionResponsePolicy.cs | 116 ++++++++++++++++++ .../src/ResponseStatusCodes.cs | 18 +++ 4 files changed, 188 insertions(+), 32 deletions(-) create mode 100644 sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/IngestionResponsePolicy.cs create mode 100644 sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ResponseStatusCodes.cs diff --git a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ApplicationInsightsRestClient.cs b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ApplicationInsightsRestClient.cs index fae1af6d2b3e8..5ecfce88a19ad 100644 --- a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ApplicationInsightsRestClient.cs +++ b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ApplicationInsightsRestClient.cs @@ -3,10 +3,8 @@ using System; using System.Collections.Generic; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Azure; using Azure.Core; using OpenTelemetry.Exporter.AzureMonitor.Models; @@ -14,7 +12,13 @@ namespace OpenTelemetry.Exporter.AzureMonitor { internal partial class ApplicationInsightsRestClient { - internal async Task> InternalTrackAsync(IEnumerable body, CancellationToken cancellationToken = default) + /// + /// This operation sends a sequence of telemetry events that will be monitored by Azure Monitor. + /// + /// The list of telemetry events to track. + /// The cancellation token to use. + /// + internal async Task InternalTrackAsync(IEnumerable body, CancellationToken cancellationToken = default) { if (body == null) { @@ -22,27 +26,24 @@ internal async Task> InternalTrackAsync(IEnumerable + /// This operation sends a blob from persistent storage that will be monitored by Azure Monitor. + /// + /// Content of blob to track. + /// The cancellation token to use. + /// + internal async Task InternalTrackAsync(ReadOnlyMemory body, CancellationToken cancellationToken = default) + { + using var message = CreateTrackRequest(body); + await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false); + + return message.TryGetProperty("ItemsAccepted", out var objItemsAccepted) && objItemsAccepted is int itemsAccepted ? itemsAccepted : 0; } internal HttpMessage CreateTrackRequest(IEnumerable body) @@ -66,5 +67,22 @@ internal HttpMessage CreateTrackRequest(IEnumerable body) request.Content = RequestContent.Create(content.ToBytes()); return message; } + + internal HttpMessage CreateTrackRequest(ReadOnlyMemory body) + { + var message = _pipeline.CreateMessage(); + var request = message.Request; + request.Method = RequestMethod.Post; + var uri = new RawRequestUriBuilder(); + uri.AppendRaw(host, false); + uri.AppendRaw("/v2", false); + uri.AppendPath("/track", false); + request.Uri = uri; + request.Headers.Add("Content-Type", "application/json"); + request.Headers.Add("Accept", "application/json"); + using var content = new NDJsonWriter(); + request.Content = RequestContent.Create(body); + return message; + } } } diff --git a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/AzureMonitorTransmitter.cs b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/AzureMonitorTransmitter.cs index 6a3f8d431b5dd..6487ae71d2b64 100644 --- a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/AzureMonitorTransmitter.cs +++ b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/AzureMonitorTransmitter.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; - +using Azure.Core; using Azure.Core.Pipeline; using OpenTelemetry.Exporter.AzureMonitor.ConnectionString; @@ -19,13 +19,13 @@ namespace OpenTelemetry.Exporter.AzureMonitor internal class AzureMonitorTransmitter : ITransmitter { private readonly ApplicationInsightsRestClient applicationInsightsRestClient; - private readonly AzureMonitorExporterOptions options; - public AzureMonitorTransmitter(AzureMonitorExporterOptions exporterOptions) + public AzureMonitorTransmitter(AzureMonitorExporterOptions options) { - ConnectionStringParser.GetValues(exporterOptions.ConnectionString, out _, out string ingestionEndpoint); + ConnectionStringParser.GetValues(options.ConnectionString, out _, out string ingestionEndpoint); + options.Retry.MaxRetries = 0; + options.AddPolicy(new IngestionResponsePolicy(), HttpPipelinePosition.PerCall); - options = exporterOptions; applicationInsightsRestClient = new ApplicationInsightsRestClient(new ClientDiagnostics(options), HttpPipelineBuilder.Build(options), host: ingestionEndpoint); } @@ -36,26 +36,30 @@ public async ValueTask TrackAsync(IEnumerable telemetryItems return 0; } - Azure.Response response = null; + int itemsAccepted = 0; try { if (async) { - response = await this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false); + itemsAccepted = await this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false); } else { - response = this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result; + itemsAccepted = this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result; } } catch (Exception ex) { + if (ex?.InnerException?.InnerException?.Source == "System.Net.Http") + { + // TODO: Network issue. Send Telemetry Items To Storage + } // TODO: Log the exception to new event source. If we get a common logger we could just log exception to it. AzureMonitorTraceExporterEventSource.Log.FailedExport(ex); } - return response == null ? 0 : response.Value.ItemsAccepted.GetValueOrDefault(); + return itemsAccepted; } } } diff --git a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/IngestionResponsePolicy.cs b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/IngestionResponsePolicy.cs new file mode 100644 index 0000000000000..5abf697975976 --- /dev/null +++ b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/IngestionResponsePolicy.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using Azure; +using Azure.Core; +using Azure.Core.Pipeline; +using OpenTelemetry.Exporter.AzureMonitor.Models; + +namespace OpenTelemetry.Exporter.AzureMonitor +{ + internal class IngestionResponsePolicy : HttpPipelineSynchronousPolicy + { + public override void OnReceivedResponse(HttpMessage message) + { + base.OnReceivedResponse(message); + + int itemsAccepted; + + if (message.TryGetProperty("TelemetryItems", out var telemetryItems)) + { + itemsAccepted = ParseResponse(message, (IEnumerable)telemetryItems); + } + else + { + itemsAccepted = ParseResponse(message); + } + + message.SetProperty("ItemsAccepted", itemsAccepted); + } + + internal static int ParseResponse(HttpMessage message, IEnumerable telemetryItems) + { + var httpStatus = message?.Response?.Status; + int itemsAccepted = 0; + + switch (httpStatus) + { + case ResponseStatusCodes.Success: + itemsAccepted = telemetryItems.Count(); + break; + case ResponseStatusCodes.PartialSuccess: + // Parse retry-after header + // Send Failed Messages To Storage + break; + case ResponseStatusCodes.RequestTimeout: + case ResponseStatusCodes.ResponseCodeTooManyRequests: + case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache: + // Parse retry-after header + // Send Messages To Storage + break; + case ResponseStatusCodes.InternalServerError: + case ResponseStatusCodes.BadGateway: + case ResponseStatusCodes.ServiceUnavailable: + case ResponseStatusCodes.GatewayTimeout: + // Send Messages To Storage + break; + case null: // UnknownNetworkError + // No HttpMessage. Send TelemetryItems To Storage + break; + default: + // Log Non-Retriable Status and don't retry or store; + break; + } + + return itemsAccepted; + } + + internal static int ParseResponse(HttpMessage message) + { + var httpStatus = message?.Response?.Status; + int itemsAccepted = 0; + + switch (httpStatus) + { + case ResponseStatusCodes.Success: + itemsAccepted = GetItemsAccepted(message); + break; + case ResponseStatusCodes.PartialSuccess: + // Send Failed Messages To Storage + break; + case ResponseStatusCodes.RequestTimeout: + case ResponseStatusCodes.ResponseCodeTooManyRequests: + case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache: + case ResponseStatusCodes.InternalServerError: + case ResponseStatusCodes.BadGateway: + case ResponseStatusCodes.ServiceUnavailable: + case ResponseStatusCodes.GatewayTimeout: + case null: // UnknownNetworkError + itemsAccepted = 0; + // Request body is already in storage. No need to store again. + break; + default: + // Log Non-Retriable Status and don't retry or store; + break; + } + + return itemsAccepted; + } + + internal static int GetItemsAccepted(HttpMessage message) + { + int itemsAccepted = 0; + using (JsonDocument document = JsonDocument.Parse(message.Response.ContentStream, default)) + { + var value = TrackResponse.DeserializeTrackResponse(document.RootElement); + Response.FromValue(value, message.Response); + itemsAccepted = value.ItemsAccepted.GetValueOrDefault(); + } + + return itemsAccepted; + } + } +} diff --git a/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ResponseStatusCodes.cs b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ResponseStatusCodes.cs new file mode 100644 index 0000000000000..4123265c7f10a --- /dev/null +++ b/sdk/monitor/OpenTelemetry.Exporter.AzureMonitor/src/ResponseStatusCodes.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace OpenTelemetry.Exporter.AzureMonitor +{ + internal class ResponseStatusCodes + { + public const int Success = 200; + public const int PartialSuccess = 206; + public const int RequestTimeout = 408; + public const int ResponseCodeTooManyRequests = 429; + public const int ResponseCodeTooManyRequestsAndRefreshCache = 439; + public const int InternalServerError = 500; + public const int BadGateway = 502; + public const int ServiceUnavailable = 503; + public const int GatewayTimeout = 504; + } +}