Skip to content

Commit

Permalink
Refactor exporter - step 6 (#1094)
Browse files Browse the repository at this point in the history
* implement BatchExportActivityProcessor

* fix typo

* wrap comments

* no need to stop a Stopwatch

* fix nit

* add thread name

* adopt zero-alloc enumerator

* avoid calling exporter with zero item

* better naming

* clean up

* fix the missing exportTrigger reset

* shutdown drain till sentry

* simplify the flow

* simplify the code

* periodic polling to avoid dead lock
  • Loading branch information
reyang authored Aug 19, 2020
1 parent a30634c commit bf694a0
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 34 deletions.
21 changes: 3 additions & 18 deletions docs/trace/building-your-own-exporter/MyExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Trace;

internal class MyExporter : ActivityExporter
internal class MyExporter : ActivityExporterSync
{
public override Task<ExportResult> ExportAsync(
IEnumerable<Activity> batch, CancellationToken cancellationToken)
public override ExportResultSync Export(in Batch<Activity> batch)
{
// Exporter code which can generate further
// telemetry should do so inside SuppressInstrumentation
Expand All @@ -38,17 +34,6 @@ public override Task<ExportResult> ExportAsync(
Console.WriteLine($"{activity.DisplayName}");
}

return Task.FromResult(ExportResult.Success);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"MyExporter.ShutdownAsync");
return Task.CompletedTask;
}

protected override void Dispose(bool disposing)
{
Console.WriteLine($"MyExporter.Dispose");
return ExportResultSync.Success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public static TracerProviderBuilder AddMyExporter(this TracerProviderBuilder bui
throw new ArgumentNullException(nameof(builder));
}

return builder.AddProcessor(new SimpleActivityProcessor(new MyExporter()));
return builder.AddProcessor(new BatchExportActivityProcessor(new MyExporter()));
}
}
159 changes: 148 additions & 11 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ namespace OpenTelemetry.Trace
public class BatchExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private readonly CircularBuffer<Activity> queue;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly CircularBuffer<Activity> circularBuffer;
private readonly int scheduledDelayMillis;
private readonly int exporterTimeoutMillis;
private readonly int maxExportBatchSize;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new AutoResetEvent(false);
private readonly ManualResetEvent dataExportedNotification = new ManualResetEvent(false);
private readonly ManualResetEvent shutdownTrigger = new ManualResetEvent(false);
private long shutdownDrainTarget = long.MaxValue;
private bool disposed;
private long droppedCount = 0;

Expand Down Expand Up @@ -71,10 +76,16 @@ public BatchExportActivityProcessor(
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.queue = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis);
this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis);
this.circularBuffer = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelayMillis = scheduledDelayMillis;
this.exporterTimeoutMillis = exporterTimeoutMillis;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(new ThreadStart(this.ExporterProc))
{
IsBackground = true,
Name = $"OpenTelemetry-{nameof(BatchExportActivityProcessor)}-{exporter.GetType().Name}",
};
this.exporterThread.Start();
}

/// <summary>
Expand All @@ -95,7 +106,7 @@ internal long ReceivedCount
{
get
{
return this.queue.AddedCount + this.DroppedCount;
return this.circularBuffer.AddedCount + this.DroppedCount;
}
}

Expand All @@ -106,18 +117,18 @@ internal long ProcessedCount
{
get
{
return this.queue.RemovedCount;
return this.circularBuffer.RemovedCount;
}
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.queue.TryAdd(activity, maxSpinCount: 50000))
if (this.circularBuffer.TryAdd(activity, maxSpinCount: 50000))
{
if (this.queue.Count >= this.maxExportBatchSize)
if (this.circularBuffer.Count >= this.maxExportBatchSize)
{
// TODO: signal the exporter
this.exportTrigger.Set();
}

return; // enqueue succeeded
Expand All @@ -127,6 +138,78 @@ public override void OnEnd(Activity activity)
Interlocked.Increment(ref this.droppedCount);
}

/// <summary>
/// Flushes the <see cref="Activity"/> currently in the queue, blocks
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;

if (head == tail)
{
return true; // nothing to flush
}

this.exportTrigger.Set();

if (timeoutMillis == 0)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = Stopwatch.StartNew();

// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMillis = 1000;

while (true)
{
if (timeoutMillis == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers, pollingMillis);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMillis));
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (this.shutdownDrainTarget != long.MaxValue)
{
return false;
}
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
Expand All @@ -135,6 +218,30 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
throw new NotImplementedException();
}

/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

this.shutdownDrainTarget = this.circularBuffer.AddedCount;
this.shutdownTrigger.Set();

if (timeoutMillis != 0)
{
this.exporterThread.Join(timeoutMillis);
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
Expand All @@ -153,6 +260,9 @@ protected override void Dispose(bool disposing)

if (disposing && !this.disposed)
{
// TODO: Dispose/Shutdown flow needs to be redesigned, currently it is convoluted.
this.Shutdown(this.exporterTimeoutMillis);

try
{
this.exporter.Dispose();
Expand All @@ -165,5 +275,32 @@ protected override void Dispose(bool disposing)
this.disposed = true;
}
}

private void ExporterProc()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.maxExportBatchSize)
{
WaitHandle.WaitAny(triggers, this.scheduledDelayMillis);
}

if (this.circularBuffer.Count > 0)
{
this.exporter.Export(new Batch<Activity>(this.circularBuffer, this.maxExportBatchSize));

this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}

if (this.circularBuffer.RemovedCount >= this.shutdownDrainTarget)
{
break;
}
}
}
}
}
4 changes: 0 additions & 4 deletions src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace
{
Expand Down

0 comments on commit bf694a0

Please sign in to comment.