Skip to content

Commit

Permalink
Azure Monitor Exporter - Add IngestionResponsePolicy (Azure#16032)
Browse files Browse the repository at this point in the history
* Added IngestionResponsePolicy

* check for object is int

* Added constant to response code

* Added method summary.
  • Loading branch information
rajkumar-rangaraj authored Oct 17, 2020
1 parent 8317a67 commit fdc2bd7
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,47 @@

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using OpenTelemetry.Exporter.AzureMonitor.Models;

namespace OpenTelemetry.Exporter.AzureMonitor
{
internal partial class ApplicationInsightsRestClient
{
internal async Task<Response<TrackResponse>> InternalTrackAsync(IEnumerable<TelemetryItem> body, CancellationToken cancellationToken = default)
/// <summary>
/// This operation sends a sequence of telemetry events that will be monitored by Azure Monitor.
/// </summary>
/// <param name="body">The list of telemetry events to track.</param>
/// <param name="cancellationToken">The cancellation token to use.</param>
/// <returns></returns>
internal async Task<int> InternalTrackAsync(IEnumerable<TelemetryItem> body, CancellationToken cancellationToken = default)
{
if (body == null)
{
throw new ArgumentNullException(nameof(body));
}

using var message = CreateTrackRequest(body);
message.SetProperty("TelemetryItems", body);
await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
switch (message.Response.Status)
{
case 200:
case 206:
case 408:
case 429:
case 439:
case 500:
case 502:
case 503:
case 504:
{
TrackResponse value = default;
using var document = await JsonDocument.ParseAsync(message.Response.ContentStream, default, cancellationToken).ConfigureAwait(false);
value = TrackResponse.DeserializeTrackResponse(document.RootElement);
return Response.FromValue(value, message.Response);
}
default:
throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(message.Response).ConfigureAwait(false);
}

return message.TryGetProperty("ItemsAccepted", out var objItemsAccepted) && objItemsAccepted is int itemsAccepted ? itemsAccepted : 0;
}

/// <summary>
/// This operation sends a blob from persistent storage that will be monitored by Azure Monitor.
/// </summary>
/// <param name="body">Content of blob to track.</param>
/// <param name="cancellationToken">The cancellation token to use.</param>
/// <returns></returns>
internal async Task<int> InternalTrackAsync(ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
{
using var message = CreateTrackRequest(body);
await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);

return message.TryGetProperty("ItemsAccepted", out var objItemsAccepted) && objItemsAccepted is int itemsAccepted ? itemsAccepted : 0;
}

internal HttpMessage CreateTrackRequest(IEnumerable<TelemetryItem> body)
Expand All @@ -66,5 +67,22 @@ internal HttpMessage CreateTrackRequest(IEnumerable<TelemetryItem> body)
request.Content = RequestContent.Create(content.ToBytes());
return message;
}

internal HttpMessage CreateTrackRequest(ReadOnlyMemory<byte> body)
{
var message = _pipeline.CreateMessage();
var request = message.Request;
request.Method = RequestMethod.Post;
var uri = new RawRequestUriBuilder();
uri.AppendRaw(host, false);
uri.AppendRaw("/v2", false);
uri.AppendPath("/track", false);
request.Uri = uri;
request.Headers.Add("Content-Type", "application/json");
request.Headers.Add("Accept", "application/json");
using var content = new NDJsonWriter();
request.Content = RequestContent.Create(body);
return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

using Azure.Core;
using Azure.Core.Pipeline;

using OpenTelemetry.Exporter.AzureMonitor.ConnectionString;
Expand All @@ -19,13 +19,13 @@ namespace OpenTelemetry.Exporter.AzureMonitor
internal class AzureMonitorTransmitter : ITransmitter
{
private readonly ApplicationInsightsRestClient applicationInsightsRestClient;
private readonly AzureMonitorExporterOptions options;

public AzureMonitorTransmitter(AzureMonitorExporterOptions exporterOptions)
public AzureMonitorTransmitter(AzureMonitorExporterOptions options)
{
ConnectionStringParser.GetValues(exporterOptions.ConnectionString, out _, out string ingestionEndpoint);
ConnectionStringParser.GetValues(options.ConnectionString, out _, out string ingestionEndpoint);
options.Retry.MaxRetries = 0;
options.AddPolicy(new IngestionResponsePolicy(), HttpPipelinePosition.PerCall);

options = exporterOptions;
applicationInsightsRestClient = new ApplicationInsightsRestClient(new ClientDiagnostics(options), HttpPipelineBuilder.Build(options), host: ingestionEndpoint);
}

Expand All @@ -36,26 +36,30 @@ public async ValueTask<int> TrackAsync(IEnumerable<TelemetryItem> telemetryItems
return 0;
}

Azure.Response<TrackResponse> response = null;
int itemsAccepted = 0;

try
{
if (async)
{
response = await this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false);
itemsAccepted = await this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false);
}
else
{
response = this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result;
itemsAccepted = this.applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result;
}
}
catch (Exception ex)
{
if (ex?.InnerException?.InnerException?.Source == "System.Net.Http")
{
// TODO: Network issue. Send Telemetry Items To Storage
}
// TODO: Log the exception to new event source. If we get a common logger we could just log exception to it.
AzureMonitorTraceExporterEventSource.Log.FailedExport(ex);
}

return response == null ? 0 : response.Value.ItemsAccepted.GetValueOrDefault();
return itemsAccepted;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Azure;
using Azure.Core;
using Azure.Core.Pipeline;
using OpenTelemetry.Exporter.AzureMonitor.Models;

namespace OpenTelemetry.Exporter.AzureMonitor
{
internal class IngestionResponsePolicy : HttpPipelineSynchronousPolicy
{
public override void OnReceivedResponse(HttpMessage message)
{
base.OnReceivedResponse(message);

int itemsAccepted;

if (message.TryGetProperty("TelemetryItems", out var telemetryItems))
{
itemsAccepted = ParseResponse(message, (IEnumerable<TelemetryItem>)telemetryItems);
}
else
{
itemsAccepted = ParseResponse(message);
}

message.SetProperty("ItemsAccepted", itemsAccepted);
}

internal static int ParseResponse(HttpMessage message, IEnumerable<TelemetryItem> telemetryItems)
{
var httpStatus = message?.Response?.Status;
int itemsAccepted = 0;

switch (httpStatus)
{
case ResponseStatusCodes.Success:
itemsAccepted = telemetryItems.Count();
break;
case ResponseStatusCodes.PartialSuccess:
// Parse retry-after header
// Send Failed Messages To Storage
break;
case ResponseStatusCodes.RequestTimeout:
case ResponseStatusCodes.ResponseCodeTooManyRequests:
case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache:
// Parse retry-after header
// Send Messages To Storage
break;
case ResponseStatusCodes.InternalServerError:
case ResponseStatusCodes.BadGateway:
case ResponseStatusCodes.ServiceUnavailable:
case ResponseStatusCodes.GatewayTimeout:
// Send Messages To Storage
break;
case null: // UnknownNetworkError
// No HttpMessage. Send TelemetryItems To Storage
break;
default:
// Log Non-Retriable Status and don't retry or store;
break;
}

return itemsAccepted;
}

internal static int ParseResponse(HttpMessage message)
{
var httpStatus = message?.Response?.Status;
int itemsAccepted = 0;

switch (httpStatus)
{
case ResponseStatusCodes.Success:
itemsAccepted = GetItemsAccepted(message);
break;
case ResponseStatusCodes.PartialSuccess:
// Send Failed Messages To Storage
break;
case ResponseStatusCodes.RequestTimeout:
case ResponseStatusCodes.ResponseCodeTooManyRequests:
case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache:
case ResponseStatusCodes.InternalServerError:
case ResponseStatusCodes.BadGateway:
case ResponseStatusCodes.ServiceUnavailable:
case ResponseStatusCodes.GatewayTimeout:
case null: // UnknownNetworkError
itemsAccepted = 0;
// Request body is already in storage. No need to store again.
break;
default:
// Log Non-Retriable Status and don't retry or store;
break;
}

return itemsAccepted;
}

internal static int GetItemsAccepted(HttpMessage message)
{
int itemsAccepted = 0;
using (JsonDocument document = JsonDocument.Parse(message.Response.ContentStream, default))
{
var value = TrackResponse.DeserializeTrackResponse(document.RootElement);
Response.FromValue(value, message.Response);
itemsAccepted = value.ItemsAccepted.GetValueOrDefault();
}

return itemsAccepted;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace OpenTelemetry.Exporter.AzureMonitor
{
internal class ResponseStatusCodes
{
public const int Success = 200;
public const int PartialSuccess = 206;
public const int RequestTimeout = 408;
public const int ResponseCodeTooManyRequests = 429;
public const int ResponseCodeTooManyRequestsAndRefreshCache = 439;
public const int InternalServerError = 500;
public const int BadGateway = 502;
public const int ServiceUnavailable = 503;
public const int GatewayTimeout = 504;
}
}

0 comments on commit fdc2bd7

Please sign in to comment.