Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exporter - step 6 #1094

Merged
merged 15 commits into from
Aug 19, 2020
21 changes: 3 additions & 18 deletions docs/trace/building-your-own-exporter/MyExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Trace;

internal class MyExporter : ActivityExporter
internal class MyExporter : ActivityExporterSync
{
public override Task<ExportResult> ExportAsync(
IEnumerable<Activity> batch, CancellationToken cancellationToken)
public override ExportResultSync Export(in Batch<Activity> batch)
{
// Exporter code which can generate further
// telemetry should do so inside SuppressInstrumentation
Expand All @@ -38,17 +34,6 @@ public override Task<ExportResult> ExportAsync(
Console.WriteLine($"{activity.DisplayName}");
}

return Task.FromResult(ExportResult.Success);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"MyExporter.ShutdownAsync");
return Task.CompletedTask;
}

protected override void Dispose(bool disposing)
{
Console.WriteLine($"MyExporter.Dispose");
return ExportResultSync.Success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public static TracerProviderBuilder AddMyExporter(this TracerProviderBuilder bui
throw new ArgumentNullException(nameof(builder));
}

return builder.AddProcessor(new SimpleActivityProcessor(new MyExporter()));
return builder.AddProcessor(new BatchExportActivityProcessor(new MyExporter()));
}
}
173 changes: 162 additions & 11 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ namespace OpenTelemetry.Trace
public class BatchExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private readonly CircularBuffer<Activity> queue;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly CircularBuffer<Activity> circularBuffer;
private readonly int scheduledDelayMillis;
private readonly int exporterTimeoutMillis;
private readonly int maxExportBatchSize;
private readonly Thread exporterThread;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a Task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having an explicit thread gives a lot of benefit:

  1. The exporter thread runaway time would give idea on how much CPU cycles were used by exporting job.
  2. While debugging a live process or a dump, it is easy to see whether the exporting job got stuck or not.
  3. It avoids the accidental context leakage of SuppressInstrumentation.

private readonly AutoResetEvent exportTrigger = new AutoResetEvent(false);
private readonly ManualResetEvent dataExportedNotification = new ManualResetEvent(false);
private readonly ManualResetEvent shutdownTrigger = new ManualResetEvent(false);
private bool disposed;
private long droppedCount = 0;

Expand Down Expand Up @@ -71,10 +75,16 @@ public BatchExportActivityProcessor(
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.queue = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis);
this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis);
this.circularBuffer = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelayMillis = scheduledDelayMillis;
this.exporterTimeoutMillis = exporterTimeoutMillis;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(new ThreadStart(this.ExporterProc))
{
IsBackground = true,
Name = $"OpenTelemetry-{nameof(BatchExportActivityProcessor)}-{exporter.GetType().Name}",
};
this.exporterThread.Start();
}

/// <summary>
Expand All @@ -95,7 +105,7 @@ internal long ReceivedCount
{
get
{
return this.queue.AddedCount + this.DroppedCount;
return this.circularBuffer.AddedCount + this.DroppedCount;
}
}

Expand All @@ -106,18 +116,18 @@ internal long ProcessedCount
{
get
{
return this.queue.RemovedCount;
return this.circularBuffer.RemovedCount;
}
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.queue.TryAdd(activity, maxSpinCount: 50000))
if (this.circularBuffer.TryAdd(activity, maxSpinCount: 50000))
{
if (this.queue.Count >= this.maxExportBatchSize)
if (this.circularBuffer.Count >= this.maxExportBatchSize)
{
// TODO: signal the exporter
this.exportTrigger.Set();
}

return; // enqueue succeeded
Expand All @@ -127,6 +137,74 @@ public override void OnEnd(Activity activity)
Interlocked.Increment(ref this.droppedCount);
}

/// <summary>
/// Flushes the <see cref="Activity"/> currently in the queue, blocks
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q.: why the preference for an int instead of a TimeSpan? I see TimeSpan as more user friendly but opinions differ :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It aligns better with the spec.
  2. It allows default parameter to be specified in the signature directly, which results in simpler code.
  3. Debugging a crash dump would be simpler.

None of the above gives any significant benefit though.

/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;

if (head == tail)
{
return true; // nothing to flush
}

this.exportTrigger.Set();

if (timeoutMillis == 0)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = Stopwatch.StartNew();

while (true)
{
if (timeoutMillis == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

WaitHandle.WaitAny(triggers, (int)timeout);
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (this.shutdownTrigger.WaitOne(0))
{
return false;
}
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
Expand All @@ -135,6 +213,49 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
throw new NotImplementedException();
}

/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
reyang marked this conversation as resolved.
Show resolved Hide resolved
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

if (timeoutMillis == Timeout.Infinite)
{
this.ForceFlush();
this.shutdownTrigger.Set();
this.exporterThread.Join();
return;
}

if (timeoutMillis == 0)
{
this.shutdownTrigger.Set();
return;
}

var sw = Stopwatch.StartNew();

this.ForceFlush(timeoutMillis);

this.shutdownTrigger.Set();

var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;

if (timeout > 0)
{
this.exporterThread.Join((int)timeout);
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
Expand All @@ -153,6 +274,9 @@ protected override void Dispose(bool disposing)

if (disposing && !this.disposed)
{
// TODO: Dispose/Shutdown flow needs to be redesigned, currently it is convoluted.
this.Shutdown(this.exporterTimeoutMillis);

try
{
this.exporter.Dispose();
Expand All @@ -165,5 +289,32 @@ protected override void Dispose(bool disposing)
this.disposed = true;
}
}

private void ExporterProc()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.maxExportBatchSize)
{
WaitHandle.WaitAny(triggers, this.scheduledDelayMillis);
}

if (this.circularBuffer.Count > 0)
{
this.exporter.Export(new Batch<Activity>(this.circularBuffer, this.maxExportBatchSize));

this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
Comment on lines +295 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to wake-up one waiting for the signal?

reyang marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.shutdownTrigger.WaitOne(0))
{
break;
}
}
}
}
}
4 changes: 0 additions & 4 deletions src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace
{
Expand Down