Skip to content

Commit

Permalink
[OneCollectorExporter] Failed transmission callback (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeBlanch authored Aug 24, 2023
1 parent 58607b7 commit fc4dc0d
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Exporter.OneCollector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Added support for receiving tranmission failures via the
`RegisterPayloadTransmittedCallback` API.
([#1309](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1309))

## 1.5.1

Released 2023-Aug-07
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal sealed class HttpJsonPostTransport : ITransport, IDisposable
private static readonly string SdkVersion = $"OTel-{Environment.OSVersion.Platform}-.net-{typeof(OneCollectorExporter<>).Assembly.GetName()?.Version?.ToString() ?? "0.0.0"}";
private static readonly string UserAgent = $".NET/{Environment.Version} HttpClient";

private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedSuccessCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedFailureCallbacks = new();
private readonly Uri endpoint;
private readonly string instrumentationKey;
private readonly OneCollectorExporterHttpTransportCompressionType compressionType;
Expand Down Expand Up @@ -56,15 +57,25 @@ public HttpJsonPostTransport(

public void Dispose()
{
this.payloadTransmittedCallbacks.Dispose();
this.payloadTransmittedSuccessCallbacks.Dispose();
this.payloadTransmittedFailureCallbacks.Dispose();
this.buffer?.Dispose();
}

public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
{
Guard.ThrowIfNull(callback);

return this.payloadTransmittedCallbacks.Add(callback);
var successRegistration = this.payloadTransmittedSuccessCallbacks.Add(callback);

if (!includeFailures)
{
return successRegistration;
}

var failureRegistration = this.payloadTransmittedFailureCallbacks.Add(callback);

return new TranmissionCallbackWrapper(successRegistration, failureRegistration);
}

public bool Send(in TransportSendRequest sendRequest)
Expand Down Expand Up @@ -112,13 +123,14 @@ public bool Send(in TransportSendRequest sendRequest)

OneCollectorExporterEventSource.Log.WriteTransportDataSentEventIfEnabled(sendRequest.ItemType, sendRequest.NumberOfItems, this.Description);

var root = this.payloadTransmittedCallbacks.Root;
var root = this.payloadTransmittedSuccessCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest);
in sendRequest,
succeeded: true);
}

return true;
Expand All @@ -135,13 +147,33 @@ public bool Send(in TransportSendRequest sendRequest)
collectorErrors,
errorDetails);

var root = this.payloadTransmittedFailureCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest,
succeeded: false);
}

return false;
}
}
catch (Exception ex)
{
OneCollectorExporterEventSource.Log.WriteTransportExceptionThrownEventIfEnabled(this.Description, ex);

var root = this.payloadTransmittedFailureCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest,
succeeded: false);
}

return false;
}
}
Expand Down Expand Up @@ -185,7 +217,8 @@ private HttpContent BuildRequestContent(Stream stream)
private void InvokePayloadTransmittedCallbacks(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
long streamStartingPosition,
in TransportSendRequest sendRequest)
in TransportSendRequest sendRequest,
bool succeeded)
{
var stream = sendRequest.ItemStream;

Expand All @@ -200,7 +233,8 @@ private void InvokePayloadTransmittedCallbacks(
sendRequest.ItemSerializationFormat,
stream,
OneCollectorExporterTransportProtocolType.HttpJsonPost,
this.endpoint));
this.endpoint,
succeeded));
}
catch (Exception ex)
{
Expand All @@ -212,6 +246,26 @@ private void InvokePayloadTransmittedCallbacks(
}
}

private sealed class TranmissionCallbackWrapper : IDisposable
{
private readonly IDisposable successRegistration;
private readonly IDisposable failureRegistration;

public TranmissionCallbackWrapper(
IDisposable successRegistration,
IDisposable failureRegistration)
{
this.successRegistration = successRegistration;
this.failureRegistration = failureRegistration;
}

public void Dispose()
{
this.successRegistration.Dispose();
this.failureRegistration.Dispose();
}
}

private sealed class NonDisposingStreamContent : HttpContent
{
#pragma warning disable CA2213 // Disposable fields should be disposed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ internal interface ITransport

bool Send(in TransportSendRequest sendRequest);

IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback);
IDisposable RegisterPayloadTransmittedCallback(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
bool includeFailures);
}
29 changes: 27 additions & 2 deletions src/OpenTelemetry.Exporter.OneCollector/OneCollectorExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,41 @@ public sealed override ExportResult Export(in Batch<T> batch)

/// <summary>
/// Register a callback action that will be triggered any time a payload is
/// transmitted by the exporter.
/// successfully transmitted by the exporter.
/// </summary>
/// <remarks>
/// Success or failure of a transmission depends on the transport being
/// used. In the case of HTTP transport, success is driven by the HTTP
/// response status code (anything in the 200-range indicates success) and
/// any other result (connection failure, timeout, non-200 response code,
/// etc.) is considered a failure.
/// </remarks>
/// <param name="callback"><see
/// cref="OneCollectorExporterPayloadTransmittedCallbackAction"/>.</param>
/// <returns><see langword="null"/> if no transport is tied to the exporter
/// or an <see cref="IDisposable"/> representing the registered callback.
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback);
=> this.RegisterPayloadTransmittedCallback(callback, includeFailures: false);

/// <summary>
/// Register a callback action that will be triggered any time a payload is
/// transmitted by the exporter.
/// </summary>
/// <remarks><inheritdoc cref="RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction)" path="/remarks"/></remarks>
/// <param name="callback"><see
/// cref="OneCollectorExporterPayloadTransmittedCallbackAction"/>.</param>
/// <param name="includeFailures">Specify <see langword="true"/> to receive
/// callbacks when transmission fails. See <see
/// cref="OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded"/>
/// for details about how a success or failure is determined.</param>
/// <returns><see langword="null"/> if no transport is tied to the exporter
/// or an <see cref="IDisposable"/> representing the registered callback.
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback, includeFailures);

/// <inheritdoc/>
protected override void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
OneCollectorExporterSerializationFormatType payloadSerializationFormat,
Stream payloadStream,
OneCollectorExporterTransportProtocolType transportProtocol,
Uri transportEndpoint)
Uri transportEndpoint,
bool succeeded)
{
Debug.Assert(payloadStream != null, "payload stream was null");
Debug.Assert(payloadStream!.CanSeek, "payload stream was not seekable");
Expand All @@ -49,6 +50,7 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
this.payloadStream = payloadStream;
this.TransportProtocol = transportProtocol;
this.TransportEndpoint = transportEndpoint!;
this.Succeeded = succeeded;
}

/// <summary>
Expand All @@ -61,6 +63,28 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
/// </summary>
public Uri TransportEndpoint { get; }

/// <summary>
/// Gets a value indicating whether or not the payload transmission was successful.
/// </summary>
/// <remarks>
/// Notes:
/// <list type="bullet">
/// <item>
/// A <see langword="true"/> value indicates a request was fully transmitted
/// and acknowledged.
/// </item>
/// <item>
/// A <see langword="false"/> value indicates a request did NOT fully
/// transmit or an acknowledgement was NOT received. Data may have been
/// partially or fully transmitted in this case.
/// </item>
/// <item>
/// <inheritdoc cref="OneCollectorExporter{T}.RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction)" path="/remarks"/>
/// </item>
/// </list>
/// </remarks>
public bool Succeeded { get; }

/// <summary>
/// Gets the payload serialization format.
/// </summary>
Expand Down
Loading

0 comments on commit fc4dc0d

Please sign in to comment.