Skip to content

Commit

Permalink
Add support for multiple pipelines in OpenTelemetryBuilder with Activ…
Browse files Browse the repository at this point in the history
…ity (#735)

* Add support for multiple pipelines in OpenTelemetryBuilder with Activity

* change Setpipeline toAddpipeline

* Dispose activityprocessor
  • Loading branch information
cijothomas authored Jun 18, 2020
1 parent 10f870e commit afd9135
Show file tree
Hide file tree
Showing 19 changed files with 857 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static OpenTelemetryBuilder UseConsoleActivityExporter(this OpenTelemetry
var exporterOptions = new ConsoleActivityExporterOptions();
configure(exporterOptions);
var consoleExporter = new ConsoleActivityExporter(exporterOptions);
return builder.SetProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
return builder.AddProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static OpenTelemetryBuilder UseJaegerActivityExporter(this OpenTelemetryB
throw new ArgumentNullException(nameof(configure));
}

return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new JaegerExporterOptions();
configure(exporterOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -102,7 +102,7 @@ public static OpenTelemetryBuilder UseOpenTelemetryProtocolActivityExporter(this
throw new ArgumentNullException(nameof(configure));
}

return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new ExporterOptions();
configure(exporterOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ internal ActivityProcessor Build()
}
else if (this.Exporter != null)
{
// TODO: Make this BatchingActivityProcessor once its available.
exportingProcessor = new SimpleActivityProcessor(this.Exporter);
this.Processors.Add(exportingProcessor);
}
Expand Down
11 changes: 8 additions & 3 deletions src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal OpenTelemetryBuilder()
{
}

internal ActivityProcessorPipelineBuilder ProcessingPipeline { get; private set; }
internal List<ActivityProcessorPipelineBuilder> ProcessingPipelines { get; private set; }

internal List<InstrumentationFactory> InstrumentationFactories { get; private set; }

Expand All @@ -42,16 +42,21 @@ internal OpenTelemetryBuilder()
/// </summary>
/// <param name="configure">Function that configures pipeline.</param>
/// <returns>Returns <see cref="OpenTelemetryBuilder"/> for chaining.</returns>
public OpenTelemetryBuilder SetProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
public OpenTelemetryBuilder AddProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

if (this.ProcessingPipelines == null)
{
this.ProcessingPipelines = new List<ActivityProcessorPipelineBuilder>();
}

var pipelineBuilder = new ActivityProcessorPipelineBuilder();
configure(pipelineBuilder);
this.ProcessingPipeline = pipelineBuilder;
this.ProcessingPipelines.Add(pipelineBuilder);
return this;
}

Expand Down
28 changes: 26 additions & 2 deletions src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using OpenTelemetry.Trace.Export;
using OpenTelemetry.Trace.Export.Internal;
using OpenTelemetry.Trace.Samplers;

namespace OpenTelemetry.Trace.Configuration
{
public class OpenTelemetrySdk : IDisposable
{
private readonly List<object> instrumentations = new List<object>();
private ActivityProcessor activityProcessor;
private ActivityListener listener;

static OpenTelemetrySdk()
Expand Down Expand Up @@ -54,14 +57,29 @@ public static OpenTelemetrySdk EnableOpenTelemetry(Action<OpenTelemetryBuilder>
ActivitySampler sampler = openTelemetryBuilder.Sampler ?? new AlwaysOnActivitySampler();

ActivityProcessor activityProcessor;
if (openTelemetryBuilder.ProcessingPipeline == null)
if (openTelemetryBuilder.ProcessingPipelines == null || !openTelemetryBuilder.ProcessingPipelines.Any())
{
// if there are no pipelines are configured, use noop processor
activityProcessor = new NoopActivityProcessor();
}
else if (openTelemetryBuilder.ProcessingPipelines.Count == 1)
{
// if there is only one pipeline - use it's outer processor as a
// single processor on the tracerSdk.
var processorFactory = openTelemetryBuilder.ProcessingPipelines[0];
activityProcessor = processorFactory.Build();
}
else
{
activityProcessor = openTelemetryBuilder.ProcessingPipeline.Build();
// if there are more pipelines, use processor that will broadcast to all pipelines
var processors = new ActivityProcessor[openTelemetryBuilder.ProcessingPipelines.Count];

for (int i = 0; i < openTelemetryBuilder.ProcessingPipelines.Count; i++)
{
processors[i] = openTelemetryBuilder.ProcessingPipelines[i].Build();
}

activityProcessor = new BroadcastActivityProcessor(processors);
}

var activitySource = new ActivitySourceAdapter(sampler, activityProcessor);
Expand Down Expand Up @@ -96,6 +114,7 @@ public static OpenTelemetrySdk EnableOpenTelemetry(Action<OpenTelemetryBuilder>
};

ActivitySource.AddActivityListener(openTelemetrySDK.listener);
openTelemetrySDK.activityProcessor = activityProcessor;
return openTelemetrySDK;
}

Expand All @@ -112,6 +131,11 @@ public void Dispose()
}

this.instrumentations.Clear();

if (this.activityProcessor is IDisposable disposableProcessor)
{
disposableProcessor.Dispose();
}
}

internal static ActivityDataRequest ComputeActivityDataRequest(
Expand Down
105 changes: 105 additions & 0 deletions src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// <copyright file="BroadcastActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace.Export.Internal
{
internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable
{
private readonly IEnumerable<ActivityProcessor> processors;

public BroadcastActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
if (processors == null)
{
throw new ArgumentNullException(nameof(processors));
}

if (!processors.Any())
{
throw new ArgumentException($"{nameof(processors)} collection is empty");
}

this.processors = processors;
}

public override void OnEnd(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}

public override void OnStart(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var processor in this.processors)
{
tasks.Add(processor.ShutdownAsync(cancellationToken));
}

return Task.WhenAll(tasks);
}

public void Dispose()
{
foreach (var processor in this.processors)
{
try
{
if (processor is IDisposable disposable)
{
disposable.Dispose();
}
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void AspNetRequestsAreCollectedSuccessfully(
options.TextFormat = textFormat.Object;
}
})
.SetProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
{
activity.Start();
this.fakeAspNetDiagnosticSource.Write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan()
void ConfigureTestServices(IServiceCollection services)
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}

// Arrange
Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task SuccessfulTemplateControllerCallUsesParentContext()
builder.ConfigureTestServices(services =>
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
Expand Down Expand Up @@ -148,7 +148,7 @@ public async Task CustomTextFormat()
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddRequestInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
Expand Down Expand Up @@ -181,7 +181,7 @@ void ConfigureTestServices(IServiceCollection services)
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) =>
builder.AddRequestInstrumentation((opt) => opt.RequestFilter = (ctx) => ctx.Request.Path != "/api/values/2")
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}

// Arrange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan()
{
services.AddSingleton<CallbackMiddleware.CallbackMiddlewareImpl>(new TestCallbackMiddlewareImpl());
services.AddOpenTelemetrySdk((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}))
.CreateClient())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync()

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand Down Expand Up @@ -124,7 +124,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync_CustomForma

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand Down Expand Up @@ -154,7 +154,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_HttpInstrumentat

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -172,7 +172,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_DependencyInstru

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -198,7 +198,7 @@ public async Task HttpDependenciesInstrumentationBacksOffIfAlreadyInstrumented()

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand All @@ -218,7 +218,7 @@ public async void HttpDependenciesInstrumentationFiltersOutRequests()
(opt) => opt.EventFilter = (activityName, arg1, _) => !(activityName == "System.Net.Http.HttpRequestOut" &&
arg1 is HttpRequestMessage request &&
request.RequestUri.OriginalString.Contains(this.url)))
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -234,7 +234,7 @@ public async Task HttpDependenciesInstrumentationFiltersOutRequestsToExporterEnd

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOut

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.SetHttpFlavor = tc.SetHttpFlavor)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
try
{
Expand Down
Loading

0 comments on commit afd9135

Please sign in to comment.