Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] Add DelegatingProcessor #5282

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
OpenTelemetry.DelegatingProcessor<T>
OpenTelemetry.DelegatingProcessor<T>.DelegatingProcessor(OpenTelemetry.BaseProcessor<T>! innerProcessor) -> void
override OpenTelemetry.DelegatingProcessor<T>.Dispose(bool disposing) -> void
override OpenTelemetry.DelegatingProcessor<T>.OnEnd(T data) -> void
override OpenTelemetry.DelegatingProcessor<T>.OnForceFlush(int timeoutMilliseconds) -> bool
override OpenTelemetry.DelegatingProcessor<T>.OnShutdown(int timeoutMilliseconds) -> bool
override OpenTelemetry.DelegatingProcessor<T>.OnStart(T data) -> void
74 changes: 74 additions & 0 deletions src/OpenTelemetry/DelegatingProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;

namespace OpenTelemetry;

/// <summary>
/// A <see cref="BaseProcessor{T}"/> implementation which will forward calls to
/// an inner <see cref="BaseProcessor{T}"/>.
/// </summary>
/// <typeparam name="T">The type of object to be processed.</typeparam>
public abstract class DelegatingProcessor<T> : BaseProcessor<T>
{
internal readonly BaseProcessor<T> InnerProcessor;
private bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="DelegatingProcessor{T}"/> class.
/// </summary>
/// <param name="innerProcessor"><see cref="BaseProcessor{T}"/>.</param>
protected DelegatingProcessor(BaseProcessor<T> innerProcessor)
{
Guard.ThrowIfNull(innerProcessor);

this.InnerProcessor = innerProcessor;
}

/// <inheritdoc/>
public override void OnStart(T data)
{
this.InnerProcessor.OnStart(data);
}

/// <inheritdoc/>
public override void OnEnd(T data)
{
this.InnerProcessor.OnEnd(data);
}

/// <inheritdoc/>
internal override void SetParentProvider(BaseProvider parentProvider)
{
this.InnerProcessor.SetParentProvider(parentProvider);
}

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
return this.InnerProcessor.ForceFlush(timeoutMilliseconds);
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.InnerProcessor.Shutdown(timeoutMilliseconds);
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.InnerProcessor.Dispose();
}

this.disposed = true;
}

base.Dispose(disposing);
}
}
4 changes: 4 additions & 0 deletions src/OpenTelemetry/Logs/LoggerProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public bool ContainsBatchProcessor(BaseProcessor<LogRecord> processor)
current = current.Next;
}
}
else if (processor is DelegatingProcessor<LogRecord> delegatingProcessor)
{
return this.ContainsBatchProcessor(delegatingProcessor.InnerProcessor);
}

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\EventSourceTestHelper.cs" Link="Includes\EventSourceTestHelper.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\SkipUnlessEnvVarFoundTheoryAttribute.cs" Link="Includes\SkipUnlessEnvVarFoundTheoryAttribute.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\SkipUnlessEnvVarFoundFactAttribute.cs" Link="Includes\SkipUnlessEnvVarFoundFactAttribute.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestActivityProcessor.cs" Link="Includes\TestActivityProcessor.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestEventListener.cs" Link="Includes\TestEventListener.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestProcessor.cs" Link="Includes\TestProcessor.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\Utils.cs" Link="Includes\Utils.cs" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,18 +593,18 @@ public void UseOpenTelemetryProtocolActivityExporterWithCustomActivityProcessor(
}

const string ActivitySourceName = "otlp.test";
TestActivityProcessor testActivityProcessor = new TestActivityProcessor();
var testActivityProcessor = new TestProcessor<Activity>();

bool startCalled = false;
bool endCalled = false;

testActivityProcessor.StartAction =
testActivityProcessor.OnStartAction =
(a) =>
{
startCalled = true;
};

testActivityProcessor.EndAction =
testActivityProcessor.OnEndAction =
(a) =>
{
endCalled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<ItemGroup>
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestHttpServer.cs" Link="Includes\TestHttpServer.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\EventSourceTestHelper.cs" Link="Includes\EventSourceTestHelper.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestActivityProcessor.cs" Link="Includes\TestActivityProcessor.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestEventListener.cs" Link="Includes\TestEventListener.cs" />
<Compile Include="$(RepoRoot)\test\OpenTelemetry.Tests\Shared\TestProcessor.cs" Link="Includes\TestProcessor.cs" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public void SuppressesInstrumentation()
{
const string ActivitySourceName = "zipkin.test";
Guid requestId = Guid.NewGuid();
TestActivityProcessor testActivityProcessor = new TestActivityProcessor();
var testActivityProcessor = new TestProcessor<Activity>();

int endCalledCount = 0;

testActivityProcessor.EndAction =
testActivityProcessor.OnEndAction =
(a) =>
{
endCalledCount++;
Expand Down
8 changes: 4 additions & 4 deletions test/OpenTelemetry.Tests/BaseProcessorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class BaseProcessorTest
public void Verify_ForceFlush_HandlesException()
{
// By default, ForceFlush should return true.
var testProcessor = new DelegatingProcessor<object>();
var testProcessor = new TestProcessor<object>();
Assert.True(testProcessor.ForceFlush());

// BaseExporter should catch any exceptions and return false.
Expand All @@ -23,7 +23,7 @@ public void Verify_ForceFlush_HandlesException()
public void Verify_Shutdown_HandlesSecond()
{
// By default, Shutdown should return true.
var testProcessor = new DelegatingProcessor<object>();
var testProcessor = new TestProcessor<object>();
Assert.True(testProcessor.Shutdown());

// A second Shutdown should return false.
Expand All @@ -34,7 +34,7 @@ public void Verify_Shutdown_HandlesSecond()
public void Verify_Shutdown_HandlesException()
{
// BaseExporter should catch any exceptions and return false.
var exceptionTestProcessor = new DelegatingProcessor<object>
var exceptionTestProcessor = new TestProcessor<object>
{
OnShutdownFunc = (timeout) => throw new Exception("test exception"),
};
Expand All @@ -44,7 +44,7 @@ public void Verify_Shutdown_HandlesException()
[Fact]
public void NoOp()
{
var testProcessor = new DelegatingProcessor<object>();
var testProcessor = new TestProcessor<object>();

// These two methods are no-op, but account for 7% of the test coverage.
testProcessor.OnStart(new object());
Expand Down
16 changes: 0 additions & 16 deletions test/OpenTelemetry.Tests/Shared/DelegatingProcessor.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,51 +1,47 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;

namespace OpenTelemetry.Tests;

internal class TestActivityProcessor : BaseProcessor<Activity>
public class TestProcessor<T> : BaseProcessor<T>
where T : class
{
public Action<Activity> StartAction;
public Action<Activity> EndAction;
public Action<T> OnStartAction { get; set; } = (T data) => { };

public TestActivityProcessor()
{
}
public Action<T> OnEndAction { get; set; } = (T data) => { };

public TestActivityProcessor(Action<Activity> onStart, Action<Activity> onEnd)
{
this.StartAction = onStart;
this.EndAction = onEnd;
}
public Func<int, bool> OnForceFlushFunc { get; set; } = (timeout) => true;

public Func<int, bool> OnShutdownFunc { get; set; } = (timeout) => true;

public bool ShutdownCalled { get; private set; } = false;

public bool ForceFlushCalled { get; private set; } = false;

public bool DisposedCalled { get; private set; } = false;

public override void OnStart(Activity span)
public override void OnStart(T data)
{
this.StartAction?.Invoke(span);
this.OnStartAction(data);
}

public override void OnEnd(Activity span)
public override void OnEnd(T data)
{
this.EndAction?.Invoke(span);
this.OnEndAction(data);
}

protected override bool OnForceFlush(int timeoutMilliseconds)
{
this.ForceFlushCalled = true;
return true;

return this.OnForceFlushFunc(timeoutMilliseconds);
}

protected override bool OnShutdown(int timeoutMilliseconds)
{
this.ShutdownCalled = true;
return true;

return this.OnShutdownFunc(timeoutMilliseconds);
}

protected override void Dispose(bool disposing)
Expand Down
38 changes: 22 additions & 16 deletions test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void CompositeActivityProcessor_BadArgs()
Assert.Throws<ArgumentNullException>(() => new CompositeProcessor<Activity>(null));
Assert.Throws<ArgumentException>(() => new CompositeProcessor<Activity>(Array.Empty<BaseProcessor<Activity>>()));

using var p1 = new TestActivityProcessor(null, null);
using var p1 = new TestProcessor<Activity>();
using var processor = new CompositeProcessor<Activity>(new[] { p1 });
Assert.Throws<ArgumentNullException>(() => processor.AddProcessor(null));
}
Expand All @@ -25,12 +25,16 @@ public void CompositeActivityProcessor_CallsAllProcessorSequentially()
{
var result = string.Empty;

using var p1 = new TestActivityProcessor(
activity => { result += "1"; },
activity => { result += "3"; });
using var p2 = new TestActivityProcessor(
activity => { result += "2"; },
activity => { result += "4"; });
using var p1 = new TestProcessor<Activity>()
{
OnStartAction = (_) => { result += "1"; },
OnEndAction = (_) => { result += "3"; },
};
using var p2 = new TestProcessor<Activity>()
{
OnStartAction = (_) => { result += "2"; },
OnEndAction = (_) => { result += "4"; },
};

using var activity = new Activity("test");

Expand All @@ -46,9 +50,11 @@ public void CompositeActivityProcessor_CallsAllProcessorSequentially()
[Fact]
public void CompositeActivityProcessor_ProcessorThrows()
{
using var p1 = new TestActivityProcessor(
activity => { throw new Exception("Start exception"); },
activity => { throw new Exception("End exception"); });
using var p1 = new TestProcessor<Activity>()
{
OnStartAction = (_) => { throw new Exception("Start exception"); },
OnEndAction = (_) => { throw new Exception("End exception"); },
};

using var activity = new Activity("test");

Expand All @@ -60,8 +66,8 @@ public void CompositeActivityProcessor_ProcessorThrows()
[Fact]
public void CompositeActivityProcessor_ShutsDownAll()
{
using var p1 = new TestActivityProcessor(null, null);
using var p2 = new TestActivityProcessor(null, null);
using var p1 = new TestProcessor<Activity>();
using var p2 = new TestProcessor<Activity>();

using var processor = new CompositeProcessor<Activity>(new[] { p1, p2 });
processor.Shutdown();
Expand All @@ -75,8 +81,8 @@ public void CompositeActivityProcessor_ShutsDownAll()
[InlineData(1)]
public void CompositeActivityProcessor_ForceFlush(int timeout)
{
using var p1 = new TestActivityProcessor(null, null);
using var p2 = new TestActivityProcessor(null, null);
using var p1 = new TestProcessor<Activity>();
using var p2 = new TestProcessor<Activity>();

using var processor = new CompositeProcessor<Activity>(new[] { p1, p2 });
processor.ForceFlush(timeout);
Expand All @@ -90,8 +96,8 @@ public void CompositeActivityProcessor_ForwardsParentProvider()
{
using TracerProvider provider = new TestProvider();

using var p1 = new TestActivityProcessor(null, null);
using var p2 = new TestActivityProcessor(null, null);
using var p1 = new TestProcessor<Activity>();
using var p2 = new TestProcessor<Activity>();

using var processor = new CompositeProcessor<Activity>(new[] { p1, p2 });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class TracerProviderExtensionsTest
[Fact]
public void Verify_ForceFlush_HandlesException()
{
using var testProcessor = new DelegatingProcessor<Activity>();
using var testProcessor = new TestProcessor<Activity>();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddProcessor(testProcessor)
Expand All @@ -29,7 +29,7 @@ public void Verify_ForceFlush_HandlesException()
[Fact]
public void Verify_Shutdown_HandlesSecond()
{
using var testProcessor = new DelegatingProcessor<Activity>();
using var testProcessor = new TestProcessor<Activity>();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddProcessor(testProcessor)
Expand All @@ -42,7 +42,7 @@ public void Verify_Shutdown_HandlesSecond()
[Fact]
public void Verify_Shutdown_HandlesException()
{
using var testProcessor = new DelegatingProcessor<Activity>
using var testProcessor = new TestProcessor<Activity>
{
OnShutdownFunc = (timeout) => throw new Exception("test exception"),
};
Expand Down
Loading
Loading