Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching activity processor #755

Merged
merged 16 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// <copyright file="BatchingActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace.Export
{
/// <summary>
/// Implements processor that batches activities before calling exporter.
/// </summary>
public class BatchingActivityProcessor : ActivityProcessor, IDisposable
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
private static readonly TimeSpan DefaultScheduledDelay = TimeSpan.FromMilliseconds(5000);
private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000);
private readonly ConcurrentQueue<Activity> exportQueue;
private readonly int maxQueueSize;
private readonly int maxExportBatchSize;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly ActivityExporter exporter;
private readonly List<Activity> batch = new List<Activity>();
private CancellationTokenSource cts;
private volatile int currentQueueSize;
private bool stopping = false;
reyang marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters:
/// <list type="bullet">
/// <item>
/// <description>maxQueueSize = 2048,</description>
/// </item>
/// <item>
/// <description>scheduledDelay = 5 sec,</description>
/// </item>
/// <item>
/// <description>exporterTimeout = 30 sec,</description>
/// </item>
/// <item>
/// <description>maxExportBatchSize = 512</description>
/// </item>
/// </list>
/// </summary>
/// <param name="exporter">Exporter instance.</param>
public BatchingActivityProcessor(ActivityExporter exporter)
: this(exporter, DefaultMaxQueueSize, DefaultScheduledDelay, DefaultExporterTimeout, DefaultMaxExportBatchSize)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with custom settings.
/// </summary>
/// <param name="exporter">Exporter instance.</param>
/// <param name="maxQueueSize">Maximum queue size. After the size is reached activities are dropped by processor.</param>
/// <param name="scheduledDelay">The delay between two consecutive exports.</param>
/// <param name="exporterTimeout">Maximum allowed time to export data.</param>
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize.</param>
public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduledDelay, TimeSpan exporterTimeout, int maxExportBatchSize)
{
if (maxQueueSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
}

if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
{
throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.scheduledDelay = scheduledDelay;
this.exporterTimeout = exporterTimeout;
this.maxExportBatchSize = maxExportBatchSize;

this.cts = new CancellationTokenSource();
this.exportQueue = new ConcurrentQueue<Activity>();

// worker task that will last for lifetime of processor.
// Threads are also useless as exporter tasks run in thread pool threads.
Task.Run(() => this.Worker(this.cts.Token), this.cts.Token);
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.stopping)
{
return;
}

// because of race-condition between checking the size and enqueueing,
// we might end up with a bit more activities than maxQueueSize.
// Let's just tolerate it to avoid extra synchronization.
if (this.currentQueueSize >= this.maxQueueSize)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

Interlocked.Increment(ref this.currentQueueSize);

this.exportQueue.Enqueue(activity);
}

/// <inheritdoc/>
public override async Task ShutdownAsync(CancellationToken cancellationToken)
{
if (!this.stopping)
{
this.stopping = true;

// This will stop the loop after current batch finishes.
this.cts.Cancel(false);
this.cts.Dispose();
this.cts = null;

// if there are more items, continue until cancellation token allows
while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested)
{
await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);
}

await this.exporter.ShutdownAsync(cancellationToken);

// there is no point in waiting for a worker task if cancellation happens
// it's dead already or will die on the next iteration on its own

// ExportBatchAsync must never throw, we are here either because it was cancelled
// or because there are no items left
OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize);
}
}

public void Dispose()
{
this.Dispose(true);
}

protected virtual void Dispose(bool isDisposing)
{
if (!this.stopping)
{
this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could introduce exception which is against the .NET guideline: The object must not throw an exception if its Dispose method is called multiple times.

Reason: ShutdownAsync is not thread safe, race condition could happen for this.stopping, which would re-enter the following code with this.cts == null:

this.cts.Cancel(false);
this.cts.Dispose();
this.cts = null;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with ContinueWith(_ => { }) is that to suppress any exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This will suppress an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyang @CodeBlanch @cijothomas
We have similar dispose pattern in SimpleSpanProcessor, SimpleActivityProcessor, BatchingSpanProcessor. Should we create a new issue to address it? or we consider fixing as a part of this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd open a new issue and merge this one. So that we can make progress and keep track of fixing the underlying issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue #769 to track it.

}

if (isDisposing)
{
if (this.exporter is IDisposable disposableExporter)
{
try
{
disposableExporter.Dispose();
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}

private async Task ExportBatchAsync(CancellationToken cancellationToken)
{
try
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

if (this.exportQueue.TryDequeue(out var nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
this.batch.Add(nextActivity);
}
else
{
// nothing in queue
return;
}

while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
this.batch.Add(nextActivity);
}

var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);
if (result != ExportResult.Success)
{
OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);

// we do not support retries for now and leave it up to exporter
// as only exporter implementation knows how to retry: which items failed
// and what is the reasonable policy for that exporter.
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex);
}
finally
{
this.batch.Clear();
}
}

private async Task Worker(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var sw = Stopwatch.StartNew();
using (var exportCancellationTokenSource = new CancellationTokenSource(this.exporterTimeout))
{
await this.ExportBatchAsync(exportCancellationTokenSource.Token).ConfigureAwait(false);
}

if (cancellationToken.IsCancellationRequested)
{
return;
}

var remainingWait = this.scheduledDelay - sw.Elapsed;
if (remainingWait > TimeSpan.Zero)
{
await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,32 @@ namespace OpenTelemetry.Testing.Export
{
public class TestActivityExporter : ActivityExporter
{
private readonly ConcurrentQueue<Activity> spanDataList = new ConcurrentQueue<Activity>();
private readonly ConcurrentQueue<Activity> activities = new ConcurrentQueue<Activity>();
private readonly Action<IEnumerable<Activity>> onExport;

public TestActivityExporter(Action<IEnumerable<Activity>> onExport)
{
this.onExport = onExport;
}

public Activity[] ExportedSpans => this.spanDataList.ToArray();
public Activity[] ExportedActivities => this.activities.ToArray();

public bool WasShutDown { get; private set; } = false;

public override Task<ExportResult> ExportAsync(IEnumerable<Activity> data, CancellationToken cancellationToken)
{
// Added sleep for zero milliseconds to respect cancellation time set by export timeout.
Thread.Sleep(0);
if (cancellationToken.IsCancellationRequested)
{
return default;
}

this.onExport?.Invoke(data);

foreach (var s in data)
{
this.spanDataList.Enqueue(s);
this.activities.Enqueue(s);
}

return Task.FromResult(ExportResult.Success);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="TestExporter.cs" company="OpenTelemetry Authors">
// <copyright file="TestSpanExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,12 +22,12 @@

namespace OpenTelemetry.Testing.Export
{
public class TestExporter : SpanExporter
public class TestSpanExporter : SpanExporter
{
private readonly ConcurrentQueue<SpanData> spanDataList = new ConcurrentQueue<SpanData>();
private readonly Action<IEnumerable<SpanData>> onExport;

public TestExporter(Action<IEnumerable<SpanData>> onExport)
public TestSpanExporter(Action<IEnumerable<SpanData>> onExport)
{
this.onExport = onExport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void PipelineBuilder_AddExporter()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

Assert.Same(exporter, builder.Exporter);
Expand All @@ -72,7 +72,7 @@ public void PipelineBuilder_AddExporterAndExportingProcessor()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

bool processorFactoryCalled = false;
Expand Down Expand Up @@ -203,7 +203,7 @@ public void PipelineBuilder_AddProcessorChainWithExporter()
Assert.NotNull(exporter);
return new SimpleSpanProcessor(exporter);
})
.SetExporter(new TestExporter(null));
.SetExporter(new TestSpanExporter(null));

var firstProcessor = (TestProcessor)builder.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void TracerBuilder_ValidArgs()
bool instrumentationFactoryCalled = true;

var sampler = new ProbabilitySampler(0.1);
var exporter = new TestExporter(_ => { });
var exporter = new TestSpanExporter(_ => { });
var options = new TracerConfiguration(1, 1, 1);

builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void CreateFactory_BuilderWithArgs()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down Expand Up @@ -124,7 +124,7 @@ public void CreateFactory_BuilderWithMultiplePipelines()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down
Loading