Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OneCollectorExporter] Failed transmission callback #1309

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Exporter.OneCollector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Added support for receiving tranmission failures via the
`RegisterPayloadTransmittedCallback` API.
([#1309](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1309))

## 1.5.1

Released 2023-Aug-07
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal sealed class HttpJsonPostTransport : ITransport, IDisposable
private static readonly string SdkVersion = $"OTel-{Environment.OSVersion.Platform}-.net-{typeof(OneCollectorExporter<>).Assembly.GetName()?.Version?.ToString() ?? "0.0.0"}";
private static readonly string UserAgent = $".NET/{Environment.Version} HttpClient";

private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedSuccessCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedFailureCallbacks = new();
private readonly Uri endpoint;
private readonly string instrumentationKey;
private readonly OneCollectorExporterHttpTransportCompressionType compressionType;
Expand Down Expand Up @@ -56,15 +57,25 @@ public HttpJsonPostTransport(

public void Dispose()
{
this.payloadTransmittedCallbacks.Dispose();
this.payloadTransmittedSuccessCallbacks.Dispose();
this.payloadTransmittedFailureCallbacks.Dispose();
this.buffer?.Dispose();
}

public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
{
Guard.ThrowIfNull(callback);

return this.payloadTransmittedCallbacks.Add(callback);
var successRegistration = this.payloadTransmittedSuccessCallbacks.Add(callback);

if (!includeFailures)
{
return successRegistration;
}

var failureRegistration = this.payloadTransmittedFailureCallbacks.Add(callback);

return new TranmissionCallbackWrapper(successRegistration, failureRegistration);
}

public bool Send(in TransportSendRequest sendRequest)
Expand Down Expand Up @@ -112,13 +123,14 @@ public bool Send(in TransportSendRequest sendRequest)

OneCollectorExporterEventSource.Log.WriteTransportDataSentEventIfEnabled(sendRequest.ItemType, sendRequest.NumberOfItems, this.Description);

var root = this.payloadTransmittedCallbacks.Root;
var root = this.payloadTransmittedSuccessCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest);
in sendRequest,
succeeded: true);
}

return true;
Expand All @@ -135,13 +147,33 @@ public bool Send(in TransportSendRequest sendRequest)
collectorErrors,
errorDetails);

var root = this.payloadTransmittedFailureCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest,
succeeded: false);
}

return false;
}
}
catch (Exception ex)
{
OneCollectorExporterEventSource.Log.WriteTransportExceptionThrownEventIfEnabled(this.Description, ex);

var root = this.payloadTransmittedFailureCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest,
succeeded: false);
}

return false;
}
}
Expand Down Expand Up @@ -185,7 +217,8 @@ private HttpContent BuildRequestContent(Stream stream)
private void InvokePayloadTransmittedCallbacks(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
long streamStartingPosition,
in TransportSendRequest sendRequest)
in TransportSendRequest sendRequest,
bool succeeded)
{
var stream = sendRequest.ItemStream;

Expand All @@ -200,7 +233,8 @@ private void InvokePayloadTransmittedCallbacks(
sendRequest.ItemSerializationFormat,
stream,
OneCollectorExporterTransportProtocolType.HttpJsonPost,
this.endpoint));
this.endpoint,
succeeded));
}
catch (Exception ex)
{
Expand All @@ -212,6 +246,26 @@ private void InvokePayloadTransmittedCallbacks(
}
}

private sealed class TranmissionCallbackWrapper : IDisposable
{
private readonly IDisposable successRegistration;
private readonly IDisposable failureRegistration;

public TranmissionCallbackWrapper(
IDisposable successRegistration,
IDisposable failureRegistration)
{
this.successRegistration = successRegistration;
this.failureRegistration = failureRegistration;
}

public void Dispose()
{
this.successRegistration.Dispose();
this.failureRegistration.Dispose();
}
}

private sealed class NonDisposingStreamContent : HttpContent
{
#pragma warning disable CA2213 // Disposable fields should be disposed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ internal interface ITransport

bool Send(in TransportSendRequest sendRequest);

IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback);
IDisposable RegisterPayloadTransmittedCallback(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
bool includeFailures);
}
29 changes: 27 additions & 2 deletions src/OpenTelemetry.Exporter.OneCollector/OneCollectorExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,41 @@ public sealed override ExportResult Export(in Batch<T> batch)

/// <summary>
/// Register a callback action that will be triggered any time a payload is
/// transmitted by the exporter.
/// successfully transmitted by the exporter.
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <remarks>
/// Success or failure of a transmission depends on the transport being
/// used. In the case of HTTP transport, success is driven by the HTTP
/// response status code (anything in the 200-range indicates success) and
/// any other result (connection failure, timeout, non-200 response code,
/// etc.) is considered a failure.
/// </remarks>
/// <param name="callback"><see
/// cref="OneCollectorExporterPayloadTransmittedCallbackAction"/>.</param>
/// <returns><see langword="null"/> if no transport is tied to the exporter
/// or an <see cref="IDisposable"/> representing the registered callback.
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback);
=> this.RegisterPayloadTransmittedCallback(callback, includeFailures: false);

/// <summary>
/// Register a callback action that will be triggered any time a payload is
/// transmitted by the exporter.
/// </summary>
/// <remarks><inheritdoc cref="RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction)" path="/remarks"/></remarks>
/// <param name="callback"><see
/// cref="OneCollectorExporterPayloadTransmittedCallbackAction"/>.</param>
/// <param name="includeFailures">Specify <see langword="true"/> to receive
/// callbacks when transmission fails. See <see
/// cref="OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded"/>
/// for details about how a success or failure is determined.</param>
/// <returns><see langword="null"/> if no transport is tied to the exporter
/// or an <see cref="IDisposable"/> representing the registered callback.
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback, includeFailures);

/// <inheritdoc/>
protected override void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
OneCollectorExporterSerializationFormatType payloadSerializationFormat,
Stream payloadStream,
OneCollectorExporterTransportProtocolType transportProtocol,
Uri transportEndpoint)
Uri transportEndpoint,
bool succeeded)
{
Debug.Assert(payloadStream != null, "payload stream was null");
Debug.Assert(payloadStream!.CanSeek, "payload stream was not seekable");
Expand All @@ -49,6 +50,7 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
this.payloadStream = payloadStream;
this.TransportProtocol = transportProtocol;
this.TransportEndpoint = transportEndpoint!;
this.Succeeded = succeeded;
}

/// <summary>
Expand All @@ -61,6 +63,28 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
/// </summary>
public Uri TransportEndpoint { get; }

/// <summary>
/// Gets a value indicating whether or not the payload transmission was successful.
/// </summary>
/// <remarks>
/// Notes:
/// <list type="bullet">
/// <item>
/// A <see langword="true"/> value indicates a request was fully transmitted
/// and acknowledged.
/// </item>
/// <item>
/// A <see langword="false"/> value indicates a request did NOT fully
/// transmit or an acknowledgement was NOT received. Data may have been
/// partially or fully transmitted in this case.
/// </item>
/// <item>
/// <inheritdoc cref="OneCollectorExporter{T}.RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction)" path="/remarks"/>
/// </item>
/// </list>
/// </remarks>
public bool Succeeded { get; }

/// <summary>
/// Gets the payload serialization format.
/// </summary>
Expand Down
Loading