From afd9135026ff2eb77d8ea478d69a670d6aa26892 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 17 Jun 2020 17:41:19 -0700 Subject: [PATCH] Add support for multiple pipelines in OpenTelemetryBuilder with Activity (#735) * Add support for multiple pipelines in OpenTelemetryBuilder with Activity * change Setpipeline toAddpipeline * Dispose activityprocessor --- .../OpenTelemetryBuilderExtensions.cs | 2 +- .../TracerBuilderExtensions.cs | 2 +- .../TracerBuilderExtensions.cs | 4 +- .../ActivityProcessorPipelineBuilder.cs | 1 + .../Configuration/OpenTelemetryBuilder.cs | 11 +- .../Trace/Configuration/OpenTelemetrySdk.cs | 28 +- .../Internal/BroadcastActivityProcessor.cs | 105 +++++++ .../HttpInListenerTests.cs | 2 +- .../BasicTests.cs | 8 +- ...stsCollectionsIsAccordingToTheSpecTests.cs | 2 +- .../HttpClientTests.Basic.netcore31.cs | 14 +- .../HttpClientTests.netcore31.cs | 2 +- .../HttpWebRequestTests.Basic.netfx.cs | 6 +- .../HttpWebRequestTests.netfx.cs | 2 +- .../SqlClientTests.cs | 4 +- .../Testing/Export/TestActivityExporter.cs | 62 +++++ .../Config/ActivityProcessorPipelineTests.cs | 259 ++++++++++++++++++ .../Config/BroadcastActivityProcessorTests.cs | 196 +++++++++++++ .../Export/SimpleActivityProcessorTest.cs | 176 ++++++++++++ 19 files changed, 857 insertions(+), 29 deletions(-) create mode 100644 src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs create mode 100644 test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs create mode 100644 test/OpenTelemetry.Tests/Implementation/Trace/Config/ActivityProcessorPipelineTests.cs create mode 100644 test/OpenTelemetry.Tests/Implementation/Trace/Config/BroadcastActivityProcessorTests.cs create mode 100644 test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs diff --git a/src/OpenTelemetry.Exporter.Console/OpenTelemetryBuilderExtensions.cs b/src/OpenTelemetry.Exporter.Console/OpenTelemetryBuilderExtensions.cs index 191f08f8798..9b2b1abb735 100644 --- a/src/OpenTelemetry.Exporter.Console/OpenTelemetryBuilderExtensions.cs +++ b/src/OpenTelemetry.Exporter.Console/OpenTelemetryBuilderExtensions.cs @@ -42,7 +42,7 @@ public static OpenTelemetryBuilder UseConsoleActivityExporter(this OpenTelemetry var exporterOptions = new ConsoleActivityExporterOptions(); configure(exporterOptions); var consoleExporter = new ConsoleActivityExporter(exporterOptions); - return builder.SetProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter)); + return builder.AddProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter)); } } } diff --git a/src/OpenTelemetry.Exporter.Jaeger/TracerBuilderExtensions.cs b/src/OpenTelemetry.Exporter.Jaeger/TracerBuilderExtensions.cs index 91ec666f7c9..19b14e7ab4d 100644 --- a/src/OpenTelemetry.Exporter.Jaeger/TracerBuilderExtensions.cs +++ b/src/OpenTelemetry.Exporter.Jaeger/TracerBuilderExtensions.cs @@ -101,7 +101,7 @@ public static OpenTelemetryBuilder UseJaegerActivityExporter(this OpenTelemetryB throw new ArgumentNullException(nameof(configure)); } - return builder.SetProcessorPipeline(pipeline => + return builder.AddProcessorPipeline(pipeline => { var exporterOptions = new JaegerExporterOptions(); configure(exporterOptions); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/TracerBuilderExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/TracerBuilderExtensions.cs index 84d7e250a0e..92d0e327c0b 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/TracerBuilderExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/TracerBuilderExtensions.cs @@ -1,4 +1,4 @@ -// +// // Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -102,7 +102,7 @@ public static OpenTelemetryBuilder UseOpenTelemetryProtocolActivityExporter(this throw new ArgumentNullException(nameof(configure)); } - return builder.SetProcessorPipeline(pipeline => + return builder.AddProcessorPipeline(pipeline => { var exporterOptions = new ExporterOptions(); configure(exporterOptions); diff --git a/src/OpenTelemetry/Trace/Configuration/ActivityProcessorPipelineBuilder.cs b/src/OpenTelemetry/Trace/Configuration/ActivityProcessorPipelineBuilder.cs index 23863db0284..bc51f2593cf 100644 --- a/src/OpenTelemetry/Trace/Configuration/ActivityProcessorPipelineBuilder.cs +++ b/src/OpenTelemetry/Trace/Configuration/ActivityProcessorPipelineBuilder.cs @@ -94,6 +94,7 @@ internal ActivityProcessor Build() } else if (this.Exporter != null) { + // TODO: Make this BatchingActivityProcessor once its available. exportingProcessor = new SimpleActivityProcessor(this.Exporter); this.Processors.Add(exportingProcessor); } diff --git a/src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs b/src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs index e1be4b3d463..c6a323bd1aa 100644 --- a/src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs +++ b/src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs @@ -29,7 +29,7 @@ internal OpenTelemetryBuilder() { } - internal ActivityProcessorPipelineBuilder ProcessingPipeline { get; private set; } + internal List ProcessingPipelines { get; private set; } internal List InstrumentationFactories { get; private set; } @@ -42,16 +42,21 @@ internal OpenTelemetryBuilder() /// /// Function that configures pipeline. /// Returns for chaining. - public OpenTelemetryBuilder SetProcessorPipeline(Action configure) + public OpenTelemetryBuilder AddProcessorPipeline(Action configure) { if (configure == null) { throw new ArgumentNullException(nameof(configure)); } + if (this.ProcessingPipelines == null) + { + this.ProcessingPipelines = new List(); + } + var pipelineBuilder = new ActivityProcessorPipelineBuilder(); configure(pipelineBuilder); - this.ProcessingPipeline = pipelineBuilder; + this.ProcessingPipelines.Add(pipelineBuilder); return this; } diff --git a/src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs b/src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs index 82df8b91db6..fb47b057b5e 100644 --- a/src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs +++ b/src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs @@ -17,7 +17,9 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using OpenTelemetry.Trace.Export; +using OpenTelemetry.Trace.Export.Internal; using OpenTelemetry.Trace.Samplers; namespace OpenTelemetry.Trace.Configuration @@ -25,6 +27,7 @@ namespace OpenTelemetry.Trace.Configuration public class OpenTelemetrySdk : IDisposable { private readonly List instrumentations = new List(); + private ActivityProcessor activityProcessor; private ActivityListener listener; static OpenTelemetrySdk() @@ -54,14 +57,29 @@ public static OpenTelemetrySdk EnableOpenTelemetry(Action ActivitySampler sampler = openTelemetryBuilder.Sampler ?? new AlwaysOnActivitySampler(); ActivityProcessor activityProcessor; - if (openTelemetryBuilder.ProcessingPipeline == null) + if (openTelemetryBuilder.ProcessingPipelines == null || !openTelemetryBuilder.ProcessingPipelines.Any()) { // if there are no pipelines are configured, use noop processor activityProcessor = new NoopActivityProcessor(); } + else if (openTelemetryBuilder.ProcessingPipelines.Count == 1) + { + // if there is only one pipeline - use it's outer processor as a + // single processor on the tracerSdk. + var processorFactory = openTelemetryBuilder.ProcessingPipelines[0]; + activityProcessor = processorFactory.Build(); + } else { - activityProcessor = openTelemetryBuilder.ProcessingPipeline.Build(); + // if there are more pipelines, use processor that will broadcast to all pipelines + var processors = new ActivityProcessor[openTelemetryBuilder.ProcessingPipelines.Count]; + + for (int i = 0; i < openTelemetryBuilder.ProcessingPipelines.Count; i++) + { + processors[i] = openTelemetryBuilder.ProcessingPipelines[i].Build(); + } + + activityProcessor = new BroadcastActivityProcessor(processors); } var activitySource = new ActivitySourceAdapter(sampler, activityProcessor); @@ -96,6 +114,7 @@ public static OpenTelemetrySdk EnableOpenTelemetry(Action }; ActivitySource.AddActivityListener(openTelemetrySDK.listener); + openTelemetrySDK.activityProcessor = activityProcessor; return openTelemetrySDK; } @@ -112,6 +131,11 @@ public void Dispose() } this.instrumentations.Clear(); + + if (this.activityProcessor is IDisposable disposableProcessor) + { + disposableProcessor.Dispose(); + } } internal static ActivityDataRequest ComputeActivityDataRequest( diff --git a/src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs b/src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs new file mode 100644 index 00000000000..1cf25a9fc26 --- /dev/null +++ b/src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs @@ -0,0 +1,105 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace.Export.Internal +{ + internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable + { + private readonly IEnumerable processors; + + public BroadcastActivityProcessor(IEnumerable processors) + { + if (processors == null) + { + throw new ArgumentNullException(nameof(processors)); + } + + if (!processors.Any()) + { + throw new ArgumentException($"{nameof(processors)} collection is empty"); + } + + this.processors = processors; + } + + public override void OnEnd(Activity activity) + { + foreach (var processor in this.processors) + { + try + { + processor.OnEnd(activity); + } + catch (Exception e) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e); + } + } + } + + public override void OnStart(Activity activity) + { + foreach (var processor in this.processors) + { + try + { + processor.OnStart(activity); + } + catch (Exception e) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e); + } + } + } + + public override Task ShutdownAsync(CancellationToken cancellationToken) + { + var tasks = new List(); + foreach (var processor in this.processors) + { + tasks.Add(processor.ShutdownAsync(cancellationToken)); + } + + return Task.WhenAll(tasks); + } + + public void Dispose() + { + foreach (var processor in this.processors) + { + try + { + if (processor is IDisposable disposable) + { + disposable.Dispose(); + } + } + catch (Exception e) + { + OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e); + } + } + } + } +} diff --git a/test/OpenTelemetry.Instrumentation.AspNet.Tests.Win/HttpInListenerTests.cs b/test/OpenTelemetry.Instrumentation.AspNet.Tests.Win/HttpInListenerTests.cs index d698c8bfa2f..1b8ffdb60c7 100644 --- a/test/OpenTelemetry.Instrumentation.AspNet.Tests.Win/HttpInListenerTests.cs +++ b/test/OpenTelemetry.Instrumentation.AspNet.Tests.Win/HttpInListenerTests.cs @@ -162,7 +162,7 @@ public void AspNetRequestsAreCollectedSuccessfully( options.TextFormat = textFormat.Object; } }) - .SetProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object)))) { activity.Start(); this.fakeAspNetDiagnosticSource.Write( diff --git a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs index 0afe79b1162..71aeb0b594d 100644 --- a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs +++ b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs @@ -61,7 +61,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan() void ConfigureTestServices(IServiceCollection services) { this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); } // Arrange @@ -100,7 +100,7 @@ public async Task SuccessfulTemplateControllerCallUsesParentContext() builder.ConfigureTestServices(services => { this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); }))) { using var client = testFactory.CreateClient(); @@ -148,7 +148,7 @@ public async Task CustomTextFormat() { this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddRequestInstrumentation((opt) => opt.TextFormat = textFormat.Object) - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); }))) { using var client = testFactory.CreateClient(); @@ -181,7 +181,7 @@ void ConfigureTestServices(IServiceCollection services) this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddRequestInstrumentation((opt) => opt.RequestFilter = (ctx) => ctx.Request.Path != "/api/values/2") - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); } // Arrange diff --git a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/IncomingRequestsCollectionsIsAccordingToTheSpecTests.cs b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/IncomingRequestsCollectionsIsAccordingToTheSpecTests.cs index a99f51f9947..389468303f7 100644 --- a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/IncomingRequestsCollectionsIsAccordingToTheSpecTests.cs +++ b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/IncomingRequestsCollectionsIsAccordingToTheSpecTests.cs @@ -53,7 +53,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan() { services.AddSingleton(new TestCallbackMiddlewareImpl()); services.AddOpenTelemetrySdk((builder) => builder.AddRequestInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))); })) .CreateClient()) { diff --git a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.Basic.netcore31.cs b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.Basic.netcore31.cs index 5a52b28da25..7972337c0fa 100644 --- a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.Basic.netcore31.cs +++ b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.Basic.netcore31.cs @@ -74,7 +74,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync() using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.SendAsync(request); @@ -124,7 +124,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync_CustomForma using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.TextFormat = textFormat.Object) - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.SendAsync(request); @@ -154,7 +154,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_HttpInstrumentat using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.GetAsync(this.url); @@ -172,7 +172,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_DependencyInstru using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.GetAsync(this.url); @@ -198,7 +198,7 @@ public async Task HttpDependenciesInstrumentationBacksOffIfAlreadyInstrumented() using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.SendAsync(request); @@ -218,7 +218,7 @@ public async void HttpDependenciesInstrumentationFiltersOutRequests() (opt) => opt.EventFilter = (activityName, arg1, _) => !(activityName == "System.Net.Http.HttpRequestOut" && arg1 is HttpRequestMessage request && request.RequestUri.OriginalString.Contains(this.url))) - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); await c.GetAsync(this.url); @@ -234,7 +234,7 @@ public async Task HttpDependenciesInstrumentationFiltersOutRequestsToExporterEnd using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { using var c = new HttpClient(); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); diff --git a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.netcore31.cs b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.netcore31.cs index 23d9b5ad324..a46a94e19ff 100644 --- a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.netcore31.cs +++ b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpClientTests.netcore31.cs @@ -57,7 +57,7 @@ public async Task HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOut using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.SetHttpFlavor = tc.SetHttpFlavor) - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { try { diff --git a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.Basic.netfx.cs b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.Basic.netfx.cs index 9a9ff0ff11a..cf89f21066f 100644 --- a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.Basic.netfx.cs +++ b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.Basic.netfx.cs @@ -65,7 +65,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync() var activityProcessor = new Mock(); using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b => { - b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); + b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); b.AddHttpWebRequestDependencyInstrumentation(); }); @@ -113,7 +113,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync_CustomForma var activityProcessor = new Mock(); using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b => { - b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); + b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); b.AddHttpWebRequestDependencyInstrumentation(); }); @@ -153,7 +153,7 @@ public async Task HttpDependenciesInstrumentationBacksOffIfAlreadyInstrumented() var activityProcessor = new Mock(); using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b => { - b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); + b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); b.AddHttpWebRequestDependencyInstrumentation(); }); diff --git a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.netfx.cs b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.netfx.cs index b372f9a2a8b..237b7045809 100644 --- a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.netfx.cs +++ b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/HttpWebRequestTests.netfx.cs @@ -50,7 +50,7 @@ public void HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOutTestCa var activityProcessor = new Mock(); using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b => { - b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); + b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object)); b.AddHttpWebRequestDependencyInstrumentation(); }); diff --git a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/SqlClientTests.cs b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/SqlClientTests.cs index 2d47e123b31..4f4b51b28c8 100644 --- a/test/OpenTelemetry.Instrumentation.Dependencies.Tests/SqlClientTests.cs +++ b/test/OpenTelemetry.Instrumentation.Dependencies.Tests/SqlClientTests.cs @@ -71,7 +71,7 @@ public void SqlClientCallsAreCollectedSuccessfully( opt.CaptureTextCommandContent = captureTextCommandContent; opt.CaptureStoredProcedureCommandName = captureStoredProcedureCommandName; }) - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { var operationId = Guid.NewGuid(); var sqlConnection = new SqlConnection(TestConnectionString); @@ -160,7 +160,7 @@ public void SqlClientErrorsAreCollectedSuccessfully(string beforeCommand, string using (OpenTelemetrySdk.EnableOpenTelemetry( (builder) => builder.AddSqlClientDependencyInstrumentation() - .SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) + .AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)))) { var operationId = Guid.NewGuid(); var sqlConnection = new SqlConnection(TestConnectionString); diff --git a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs new file mode 100644 index 00000000000..54feea0d9f6 --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs @@ -0,0 +1,62 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Trace.Export; + +namespace OpenTelemetry.Testing.Export +{ + public class TestActivityExporter : ActivityExporter + { + private readonly ConcurrentQueue spanDataList = new ConcurrentQueue(); + private readonly Action> onExport; + + public TestActivityExporter(Action> onExport) + { + this.onExport = onExport; + } + + public Activity[] ExportedSpans => this.spanDataList.ToArray(); + + public bool WasShutDown { get; private set; } = false; + + public override Task ExportAsync(IEnumerable data, CancellationToken cancellationToken) + { + this.onExport?.Invoke(data); + + foreach (var s in data) + { + this.spanDataList.Enqueue(s); + } + + return Task.FromResult(ExportResult.Success); + } + + public override Task ShutdownAsync(CancellationToken cancellationToken) + { + this.WasShutDown = true; +#if NET452 + return Task.FromResult(0); +#else + return Task.CompletedTask; +#endif + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/ActivityProcessorPipelineTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/ActivityProcessorPipelineTests.cs new file mode 100644 index 00000000000..42dd2f0bede --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/ActivityProcessorPipelineTests.cs @@ -0,0 +1,259 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Testing.Export; +using OpenTelemetry.Trace.Configuration; +using OpenTelemetry.Trace.Export; +using Xunit; + +namespace OpenTelemetry.Tests.Impl.Trace.Config +{ + public class ActivityProcessorPipelineTests + { + [Fact] + public void PipelineBuilder_BadArgs() + { + Assert.Throws(() => new ActivityProcessorPipelineBuilder().AddProcessor(null)); + Assert.Throws(() => new ActivityProcessorPipelineBuilder().SetExporter(null)); + Assert.Throws(() => new ActivityProcessorPipelineBuilder().SetExportingProcessor(null)); + } + + [Fact] + public void PipelineBuilder_Defaults() + { + var builder = new ActivityProcessorPipelineBuilder(); + Assert.Null(builder.Exporter); + Assert.Null(builder.Processors); + + var processor = builder.Build(); + + Assert.Null(builder.Exporter); + Assert.Single(builder.Processors); + Assert.IsType(builder.Processors[0]); + Assert.Same(processor, builder.Processors[0]); + } + + [Fact] + public void PipelineBuilder_AddExporter() + { + var builder = new ActivityProcessorPipelineBuilder(); + + var exporter = new TestActivityExporter(null); + builder.SetExporter(exporter); + + Assert.Same(exporter, builder.Exporter); + + var processor = builder.Build(); + + Assert.Single(builder.Processors); + Assert.IsType(builder.Processors.Single()); + Assert.Same(processor, builder.Processors[0]); + } + + [Fact] + public void PipelineBuilder_AddExporterAndExportingProcessor() + { + var builder = new ActivityProcessorPipelineBuilder(); + + var exporter = new TestActivityExporter(null); + builder.SetExporter(exporter); + + bool processorFactoryCalled = false; + builder.SetExportingProcessor(e => + { + processorFactoryCalled = true; + return new SimpleActivityProcessor(e); + }); + + var processor = builder.Build(); + + Assert.Single(builder.Processors); + Assert.True(processorFactoryCalled); + Assert.IsType(builder.Processors.Single()); + Assert.Same(processor, builder.Processors[0]); + } + + [Fact] + public void PipelineBuilder_AddExportingProcessor() + { + var builder = new ActivityProcessorPipelineBuilder(); + + bool processorFactoryCalled = false; + var processor = new TestProcessor(); + builder.SetExportingProcessor(e => + { + processorFactoryCalled = true; + Assert.Null(e); + return processor; + }); + + Assert.Same(processor, builder.Build()); + + Assert.Single(builder.Processors); + Assert.True(processorFactoryCalled); + Assert.Same(processor, builder.Processors.Single()); + } + + [Fact] + public void PipelineBuilder_AddProcessor() + { + var builder = new ActivityProcessorPipelineBuilder(); + + bool processorFactoryCalled = false; + var processor = new TestProcessor(); + builder.AddProcessor(e => + { + processorFactoryCalled = true; + return processor; + }); + + Assert.Same(processor, builder.Build()); + + Assert.Single(builder.Processors); + Assert.True(processorFactoryCalled); + Assert.Same(processor, builder.Processors.Single()); + } + + [Fact] + public void PipelineBuilder_AddProcessorChain() + { + var builder = new ActivityProcessorPipelineBuilder(); + + bool processorFactory1Called = false; + bool processorFactory2Called = false; + bool processorFactory3Called = false; + + builder + .AddProcessor(next => + { + processorFactory1Called = true; + Assert.NotNull(next); + return new TestProcessor(next, "1"); + }) + .AddProcessor(next => + { + processorFactory2Called = true; + Assert.NotNull(next); + return new TestProcessor(next, "2"); + }) + .AddProcessor(next => + { + processorFactory3Called = true; + Assert.Null(next); + return new TestProcessor(next, "3"); + }); + + var firstProcessor = (TestProcessor)builder.Build(); + + Assert.Equal(3, builder.Processors.Count); + Assert.True(processorFactory1Called); + Assert.True(processorFactory2Called); + Assert.True(processorFactory3Called); + + Assert.Equal("1", firstProcessor.Name); + + var secondProcessor = (TestProcessor)firstProcessor.Next; + Assert.Equal("2", secondProcessor.Name); + var thirdProcessor = (TestProcessor)secondProcessor.Next; + Assert.Equal("3", thirdProcessor.Name); + } + + [Fact] + public void PipelineBuilder_AddProcessorChainWithExporter() + { + var builder = new ActivityProcessorPipelineBuilder(); + + bool processorFactory1Called = false; + bool processorFactory2Called = false; + bool exportingFactory3Called = false; + + builder + .AddProcessor(next => + { + processorFactory1Called = true; + Assert.NotNull(next); + return new TestProcessor(next, "1"); + }) + .AddProcessor(next => + { + processorFactory2Called = true; + Assert.NotNull(next); + return new TestProcessor(next, "2"); + }) + .SetExportingProcessor(exporter => + { + exportingFactory3Called = true; + Assert.NotNull(exporter); + return new SimpleActivityProcessor(exporter); + }) + .SetExporter(new TestActivityExporter(null)); + + var firstProcessor = (TestProcessor)builder.Build(); + + Assert.Equal(3, builder.Processors.Count); + Assert.True(processorFactory1Called); + Assert.True(processorFactory2Called); + Assert.True(exportingFactory3Called); + + Assert.Equal("1", firstProcessor.Name); + + var secondProcessor = (TestProcessor)firstProcessor.Next; + Assert.Equal("2", secondProcessor.Name); + var thirdProcessor = secondProcessor.Next; + Assert.IsType(thirdProcessor); + } + + private class TestProcessor : ActivityProcessor + { + public readonly ActivityProcessor Next; + public readonly string Name; + + public TestProcessor() + { + this.Name = null; + this.Name = null; + } + + public TestProcessor(ActivityProcessor next, string name) + { + this.Next = next; + this.Name = name; + } + + public override void OnStart(Activity span) + { + } + + public override void OnEnd(Activity span) + { + } + + public override Task ShutdownAsync(CancellationToken cancellationToken) + { +#if NET452 + return Task.FromResult(0); +#else + return Task.CompletedTask; +#endif + } + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/BroadcastActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/BroadcastActivityProcessorTests.cs new file mode 100644 index 00000000000..0d284082a19 --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/BroadcastActivityProcessorTests.cs @@ -0,0 +1,196 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Trace; +using OpenTelemetry.Trace.Configuration; +using OpenTelemetry.Trace.Export; +using OpenTelemetry.Trace.Export.Internal; +using Xunit; + +namespace OpenTelemetry.Tests.Impl.Trace.Config +{ + public class BroadcastActivityProcessorTests + { + [Fact] + public void BroadcastProcessor_BadArgs() + { + Assert.Throws(() => new BroadcastActivityProcessor(null)); + Assert.Throws(() => new BroadcastActivityProcessor(new SimpleActivityProcessor[0])); + } + + [Fact] + public void BroadcastProcessor_CallsAllProcessorSequentially() + { + bool start1Called = false; + bool start2Called = false; + bool end1Called = false; + bool end2Called = false; + var processor1 = new TestProcessor( + ss => + { + start1Called = true; + Assert.False(start2Called); + Assert.False(end1Called); + Assert.False(end2Called); + }, se => + { + end1Called = true; + Assert.True(start1Called); + Assert.True(start2Called); + Assert.False(end2Called); + }); + var processor2 = new TestProcessor( + ss => + { + start2Called = true; + Assert.True(start1Called); + Assert.False(end1Called); + Assert.False(end2Called); + }, se => + { + end2Called = true; + Assert.True(start1Called); + Assert.True(start2Called); + Assert.True(end1Called); + }); + + var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 }); + + var activity = new Activity("somename"); + broadcastProcessor.OnStart(activity); + Assert.True(start1Called); + Assert.True(start2Called); + + broadcastProcessor.OnEnd(activity); + Assert.True(end1Called); + Assert.True(end2Called); + } + + [Fact] + public void BroadcastProcessor_OneProcessorThrows() + { + bool start1Called = false; + bool start2Called = false; + bool end1Called = false; + bool end2Called = false; + var processor1 = new TestProcessor( + ss => + { + start1Called = true; + Assert.False(start2Called); + Assert.False(end1Called); + Assert.False(end2Called); + + throw new Exception("Start exception"); + }, se => + { + end1Called = true; + Assert.True(start1Called); + Assert.True(start2Called); + Assert.False(end2Called); + throw new Exception("End exception"); + }); + + var processor2 = new TestProcessor( + ss => + { + start2Called = true; + Assert.True(start1Called); + Assert.False(end1Called); + Assert.False(end2Called); + }, se => + { + end2Called = true; + Assert.True(start1Called); + Assert.True(start2Called); + Assert.True(end1Called); + }); + + var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 }); + + var activity = new Activity("somename"); + broadcastProcessor.OnStart(activity); + Assert.True(start1Called); + Assert.True(start2Called); + + broadcastProcessor.OnEnd(activity); + Assert.True(end1Called); + Assert.True(end2Called); + } + + [Fact] + public void BroadcastProcessor_ShutsDownAll() + { + var processor1 = new TestProcessor(null, null); + var processor2 = new TestProcessor(null, null); + + var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 }); + + broadcastProcessor.ShutdownAsync(default); + Assert.True(processor1.ShutdownCalled); + Assert.True(processor2.ShutdownCalled); + + broadcastProcessor.Dispose(); + Assert.True(processor1.DisposedCalled); + Assert.True(processor2.DisposedCalled); + } + + private class TestProcessor : ActivityProcessor, IDisposable + { + private readonly Action onStart; + private readonly Action onEnd; + + public TestProcessor(Action onStart, Action onEnd) + { + this.onStart = onStart; + this.onEnd = onEnd; + } + + public bool ShutdownCalled { get; private set; } = false; + + public bool DisposedCalled { get; private set; } = false; + + public override void OnStart(Activity span) + { + this.onStart?.Invoke(span); + } + + public override void OnEnd(Activity span) + { + this.onEnd?.Invoke(span); + } + + public override Task ShutdownAsync(CancellationToken cancellationToken) + { + this.ShutdownCalled = true; +#if NET452 + return Task.FromResult(0); +#else + return Task.CompletedTask; +#endif + } + + public void Dispose() + { + this.DisposedCalled = true; + } + } + } +} diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs new file mode 100644 index 00000000000..cea63f1affb --- /dev/null +++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs @@ -0,0 +1,176 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Testing.Export; +using OpenTelemetry.Trace.Configuration; +using OpenTelemetry.Trace.Samplers; +using Xunit; + +namespace OpenTelemetry.Trace.Export.Test +{ + public class SimpleActivityProcessorTest : IDisposable + { + private const string SpanName1 = "MySpanName/1"; + private const string SpanName2 = "MySpanName/2"; + private const string ActivitySourceName = "defaultactivitysource"; + + private TestActivityExporter activityExporter; + private OpenTelemetrySdk openTelemetry; + private ActivitySource activitySource; + + public SimpleActivityProcessorTest() + { + this.activityExporter = new TestActivityExporter(null); + this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b + .AddActivitySource(ActivitySourceName) + .AddProcessorPipeline(p => p + .SetExporter(this.activityExporter) + .SetExportingProcessor(e => new SimpleActivityProcessor(e))) + .SetSampler(new AlwaysOnActivitySampler())); + this.activitySource = new ActivitySource(ActivitySourceName); + } + + [Fact] + public void ThrowsOnNullExporter() + { + Assert.Throws(() => new SimpleActivityProcessor(null)); + } + + [Fact] + public void ThrowsInExporter() + { + this.activityExporter = new TestActivityExporter(_ => throw new ArgumentException("123")); + this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b + .AddActivitySource("cijo") + .AddProcessorPipeline(p => p + .SetExporter(this.activityExporter) + .SetExportingProcessor(e => new SimpleActivityProcessor(e)))); + + ActivitySource source = new ActivitySource("cijo"); + var activity = source.StartActivity("somename"); + + // does not throw + activity.Stop(); + } + + [Fact] + public void ProcessorDoesNotBlockOnExporter() + { + this.activityExporter = new TestActivityExporter(async _ => await Task.Delay(500)); + this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b + .AddActivitySource("cijo") + .AddProcessorPipeline(p => p + .SetExporter(this.activityExporter) + .SetExportingProcessor(e => new SimpleActivityProcessor(e)))); + + ActivitySource source = new ActivitySource("cijo"); + var activity = source.StartActivity("somename"); + + // does not block + var sw = Stopwatch.StartNew(); + activity.Stop(); + sw.Stop(); + + Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); + + var exported = this.WaitForSpans(this.activityExporter, 1, TimeSpan.FromMilliseconds(600)); + + Assert.Single(exported); + } + + [Fact] + public async Task ShutdownTwice() + { + var activityProcessor = new SimpleActivityProcessor(new TestActivityExporter(null)); + + await activityProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false); + + // does not throw + await activityProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false); + } + + [Fact] + public void ExportDifferentSampledSpans() + { + var span1 = this.CreateSampledEndedSpan(SpanName1); + var span2 = this.CreateSampledEndedSpan(SpanName2); + + var exported = this.WaitForSpans(this.activityExporter, 2, TimeSpan.FromMilliseconds(100)); + Assert.Equal(2, exported.Length); + Assert.Contains(span1, exported); + Assert.Contains(span2, exported); + } + + [Fact(Skip = "Reenable once AlwaysParentActivitySampler is added")] + public void ExportNotSampledSpans() + { + var span1 = this.CreateNotSampledEndedSpan(SpanName1); + var span2 = this.CreateSampledEndedSpan(SpanName2); + + // Spans are recorded and exported in the same order as they are ended, we test that a non + // sampled span is not exported by creating and ending a sampled span after a non sampled span + // and checking that the first exported span is the sampled span (the non sampled did not get + // exported). + + var exported = this.WaitForSpans(this.activityExporter, 1, TimeSpan.FromMilliseconds(100)); + + // Need to check this because otherwise the variable span1 is unused, other option is to not + // have a span1 variable. + Assert.Single(exported); + Assert.Contains(span2, exported); + } + + public void Dispose() + { + this.activityExporter.ShutdownAsync(CancellationToken.None); + Activity.Current = null; + } + + private Activity CreateSampledEndedSpan(string spanName) + { + var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded); + + var activity = this.activitySource.StartActivity(spanName, ActivityKind.Internal, context); + activity.Stop(); + return activity; + } + + private Activity CreateNotSampledEndedSpan(string spanName) + { + var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None); + var activity = this.activitySource.StartActivity(spanName, ActivityKind.Internal, context); + activity.Stop(); + return activity; + } + + private Activity[] WaitForSpans(TestActivityExporter exporter, int spanCount, TimeSpan timeout) + { + Assert.True( + SpinWait.SpinUntil( + () => + { + Thread.Sleep(0); + return exporter.ExportedSpans.Length >= spanCount; + }, timeout + TimeSpan.FromMilliseconds(20))); + + return exporter.ExportedSpans; + } + } +}