Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OneCollectorExporter] Failed transmission callback #1309

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OpenTelemetry.Exporter.OneCollector.OneCollectorExporter<T>.RegisterPayloadTransmittedCallback(OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackAction! callback, bool includeFailures) -> System.IDisposable?
OpenTelemetry.Exporter.OneCollector.OneCollectorExporterPayloadTransmittedCallbackArguments.Succeeded.get -> bool
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Exporter.OneCollector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Added support for receiving tranmission failures via the
`RegisterPayloadTransmittedCallback` API.
([#1309](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1309))

## 1.5.1

Released 2023-Aug-07
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal sealed class HttpJsonPostTransport : ITransport, IDisposable
private static readonly string SdkVersion = $"OTel-{Environment.OSVersion.Platform}-.net-{typeof(OneCollectorExporter<>).Assembly.GetName()?.Version?.ToString() ?? "0.0.0"}";
private static readonly string UserAgent = $".NET/{Environment.Version} HttpClient";

private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedSuccessCallbacks = new();
private readonly CallbackManager<OneCollectorExporterPayloadTransmittedCallbackAction> payloadTransmittedFailureCallbacks = new();
private readonly Uri endpoint;
private readonly string instrumentationKey;
private readonly OneCollectorExporterHttpTransportCompressionType compressionType;
Expand Down Expand Up @@ -56,15 +57,25 @@ public HttpJsonPostTransport(

public void Dispose()
{
this.payloadTransmittedCallbacks.Dispose();
this.payloadTransmittedSuccessCallbacks.Dispose();
this.payloadTransmittedFailureCallbacks.Dispose();
this.buffer?.Dispose();
}

public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
{
Guard.ThrowIfNull(callback);

return this.payloadTransmittedCallbacks.Add(callback);
var successRegistration = this.payloadTransmittedSuccessCallbacks.Add(callback);

if (!includeFailures)
{
return successRegistration;
}

var failureRegistration = this.payloadTransmittedFailureCallbacks.Add(callback);

return new TranmissionCallbackWrapper(successRegistration, failureRegistration);
}

public bool Send(in TransportSendRequest sendRequest)
Expand Down Expand Up @@ -112,13 +123,14 @@ public bool Send(in TransportSendRequest sendRequest)

OneCollectorExporterEventSource.Log.WriteTransportDataSentEventIfEnabled(sendRequest.ItemType, sendRequest.NumberOfItems, this.Description);

var root = this.payloadTransmittedCallbacks.Root;
var root = this.payloadTransmittedSuccessCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest);
in sendRequest,
succeeded: true);
}

return true;
Expand All @@ -135,6 +147,16 @@ public bool Send(in TransportSendRequest sendRequest)
collectorErrors,
errorDetails);

var root = this.payloadTransmittedFailureCallbacks.Root;
if (root != null)
{
this.InvokePayloadTransmittedCallbacks(
root,
streamStartingPosition,
in sendRequest,
succeeded: false);
}

return false;
}
}
Expand Down Expand Up @@ -185,7 +207,8 @@ private HttpContent BuildRequestContent(Stream stream)
private void InvokePayloadTransmittedCallbacks(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
long streamStartingPosition,
in TransportSendRequest sendRequest)
in TransportSendRequest sendRequest,
bool succeeded)
{
var stream = sendRequest.ItemStream;

Expand All @@ -200,7 +223,8 @@ private void InvokePayloadTransmittedCallbacks(
sendRequest.ItemSerializationFormat,
stream,
OneCollectorExporterTransportProtocolType.HttpJsonPost,
this.endpoint));
this.endpoint,
succeeded));
}
catch (Exception ex)
{
Expand All @@ -212,6 +236,26 @@ private void InvokePayloadTransmittedCallbacks(
}
}

private sealed class TranmissionCallbackWrapper : IDisposable
{
private readonly IDisposable successRegistration;
private readonly IDisposable failureRegistration;

public TranmissionCallbackWrapper(
IDisposable successRegistration,
IDisposable failureRegistration)
{
this.successRegistration = successRegistration;
this.failureRegistration = failureRegistration;
}

public void Dispose()
{
this.successRegistration.Dispose();
this.failureRegistration.Dispose();
}
}

private sealed class NonDisposingStreamContent : HttpContent
{
#pragma warning disable CA2213 // Disposable fields should be disposed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ internal interface ITransport

bool Send(in TransportSendRequest sendRequest);

IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback);
IDisposable RegisterPayloadTransmittedCallback(
OneCollectorExporterPayloadTransmittedCallbackAction callback,
bool includeFailures);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,22 @@ public sealed override ExportResult Export(in Batch<T> batch)
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback);
=> this.RegisterPayloadTransmittedCallback(callback, includeFailures: false);

/// <summary>
/// Register a callback action that will be triggered any time a payload is
/// transmitted by the exporter.
/// </summary>
/// <param name="callback"><see
/// cref="OneCollectorExporterPayloadTransmittedCallbackAction"/>.</param>
/// <param name="includeFailures">Specify <see langword="true"/> to receive
/// callbacks when transmission fails.</param>
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
/// <returns><see langword="null"/> if no transport is tied to the exporter
/// or an <see cref="IDisposable"/> representing the registered callback.
/// Call <see cref="IDisposable.Dispose"/> on the returned instance to
/// cancel the registration.</returns>
public IDisposable? RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
=> this.sink.Transport?.RegisterPayloadTransmittedCallback(callback, includeFailures);

/// <inheritdoc/>
protected override void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
OneCollectorExporterSerializationFormatType payloadSerializationFormat,
Stream payloadStream,
OneCollectorExporterTransportProtocolType transportProtocol,
Uri transportEndpoint)
Uri transportEndpoint,
bool succeeded)
{
Debug.Assert(payloadStream != null, "payload stream was null");
Debug.Assert(payloadStream!.CanSeek, "payload stream was not seekable");
Expand All @@ -49,6 +50,7 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
this.payloadStream = payloadStream;
this.TransportProtocol = transportProtocol;
this.TransportEndpoint = transportEndpoint!;
this.Succeeded = succeeded;
}

/// <summary>
Expand All @@ -61,6 +63,11 @@ internal OneCollectorExporterPayloadTransmittedCallbackArguments(
/// </summary>
public Uri TransportEndpoint { get; }

/// <summary>
/// Gets a value indicating whether or not the payload transmission was successful or not.
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public bool Succeeded { get; }

/// <summary>
/// Gets the payload serialization format.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public void RegisterPayloadTransmittedCallbackTest()
* 1) Exisiting callback fires again and is verified. Then we remove the callback.
* 2) Verifies callback is NOT attached and NOT fired.
* 3) Callback is attached again and verified to fire. Then we remove the callback.
* 4) Tests the callback on a failed message with includeFailures: false.
* 5) Tests the callback on a failed message with includeFailures: true.
*/

RunHttpServerTest(
Expand All @@ -122,21 +124,30 @@ public void RegisterPayloadTransmittedCallbackTest()
Assert.True(string.IsNullOrWhiteSpace(req.Headers["Content-Encoding"]));
Assert.Equal(request, Encoding.ASCII.GetString(body.ToArray()));
},
shouldTestFailFunc: (iteration) => iteration == 4 || iteration == 5,
testStartingAction: (iteration, transport) =>
{
switch (iteration)
{
case 0:
case 3:
Assert.Null(callbackRegistration);
callbackRegistration = transport.RegisterPayloadTransmittedCallback(OnPayloadTransmitted);
callbackRegistration = transport.RegisterPayloadTransmittedCallback(OnPayloadTransmitted, includeFailures: false);
break;
case 1:
Assert.NotNull(callbackRegistration);
break;
case 2:
Assert.Null(callbackRegistration);
break;
case 4:
Assert.Null(callbackRegistration);
callbackRegistration = transport.RegisterPayloadTransmittedCallback(OnPayloadTransmitted, includeFailures: false);
break;
case 5:
Assert.Null(callbackRegistration);
callbackRegistration = transport.RegisterPayloadTransmittedCallback(OnPayloadTransmitted, includeFailures: true);
break;
}
},
testFinishedAction: (iteration, transport) =>
Expand All @@ -149,8 +160,18 @@ public void RegisterPayloadTransmittedCallbackTest()
break;
case 1:
case 3:
case 4:
case 5:
Assert.NotNull(callbackRegistration);
Assert.True(callbackFired);
if (iteration == 4)
{
Assert.False(callbackFired);
}
else
{
Assert.True(callbackFired);
}

callbackRegistration.Dispose();
callbackRegistration = null;
break;
Expand All @@ -163,9 +184,9 @@ public void RegisterPayloadTransmittedCallbackTest()
callbackFired = false;
lastCompletedIteration = iteration;
},
testIterations: 4);
testIterations: 6);

Assert.Equal(3, lastCompletedIteration);
Assert.Equal(5, lastCompletedIteration);

void OnPayloadTransmitted(in OneCollectorExporterPayloadTransmittedCallbackArguments arguments)
{
Expand All @@ -178,6 +199,14 @@ void OnPayloadTransmitted(in OneCollectorExporterPayloadTransmittedCallbackArgum
Assert.Equal(OneCollectorExporterSerializationFormatType.CommonSchemaV4JsonStream, arguments.PayloadSerializationFormat);
Assert.Equal(OneCollectorExporterTransportProtocolType.HttpJsonPost, arguments.TransportProtocol);
Assert.NotNull(arguments.TransportEndpoint);
if (lastCompletedIteration == 4)
{
Assert.False(arguments.Succeeded);
}
else
{
Assert.True(arguments.Succeeded);
}
}
}

Expand All @@ -197,16 +226,19 @@ private static void RunHttpServerTest(
Action<HttpListenerRequest, MemoryStream> assertRequestAction,
int numberOfItemsInRequestBody = 1,
int testIterations = 1,
Func<int, bool>? shouldTestFailFunc = null,
Action<int, ITransport>? testStartingAction = null,
Action<int, ITransport>? testFinishedAction = null)
{
shouldTestFailFunc ??= static iteration => false;
bool failTest = false;
bool requestReceivedAndAsserted = false;
Exception? testException = null;

using var testServer = TestHttpServer.RunServer(
context =>
{
context.Response.StatusCode = 200;
context.Response.StatusCode = failTest ? 400 : 200;

using MemoryStream requestBody = new MemoryStream();

Expand Down Expand Up @@ -241,6 +273,8 @@ private static void RunHttpServerTest(

for (int i = 0; i < testIterations; i++)
{
failTest = shouldTestFailFunc(i);

testStartingAction?.Invoke(i, transport);

using var requestBodyStream = new MemoryStream(requestBodyBytes);
Expand All @@ -259,7 +293,7 @@ private static void RunHttpServerTest(
throw testException;
}

Assert.True(result);
Assert.NotEqual(failTest, result);
Assert.True(requestReceivedAndAsserted);

testFinishedAction?.Invoke(i, transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private sealed class TestTransport : ITransport

public List<byte[]> ExportedData { get; } = new();

public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback)
public IDisposable RegisterPayloadTransmittedCallback(OneCollectorExporterPayloadTransmittedCallbackAction callback, bool includeFailures)
{
throw new NotImplementedException();
}
Expand Down