Skip to content

Commit

Permalink
Add await performance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Feb 24, 2024
1 parent d610e62 commit ceb4ee1
Show file tree
Hide file tree
Showing 6 changed files with 755 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Disruptor.Util;
using HdrHistogram;

namespace Disruptor.PerfTests.Latency.OneWay;

public class OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore : ILatencyTest, IExternalTest
{
private const long _iterations = 1_000_000;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private readonly ManualResetEventSlim _completed = new();
private readonly Waiter[] _waiters;

public OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore()
{
_waiters = Enumerable.Range(0, 100).Select(x => new Waiter()).ToArray();
}

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
waiter.Start();
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();

for (int i = 0; i < _iterations; i++)
{
var now = Stopwatch.GetTimestamp();
while (now < next)
{
Thread.Yield();
now = Stopwatch.GetTimestamp();
}

var waiter = _waiters[i % _waiters.Length];
waiter.Notify(now);

next = now + pause;
}

foreach (var waiter in _waiters)
{
waiter.Notify(-1);
}

foreach (var waiter in _waiters)
{
waiter.Task.Wait();
}

stopwatch.Stop();
}

private class Waiter : IValueTaskSource<bool>
{
private ManualResetValueTaskSourceCore<bool> _valueTaskSourceCore = new() { RunContinuationsAsynchronously = true };
private HistogramBase _histogram;
private long _startTimestamp;

public Task Task { get; private set; }

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
_valueTaskSourceCore.Reset();
Volatile.Write(ref _startTimestamp, 0);
}

public void Notify(long timestamp)
{
while (Volatile.Read(ref _startTimestamp) != 0)
{
Thread.Sleep(0);
}

Volatile.Write(ref _startTimestamp, timestamp);
_valueTaskSourceCore.SetResult(true);
}

public void Start()
{
Task = RunImpl();

async Task RunImpl()
{
while (true)
{
await new ValueTask<bool>(this, _valueTaskSourceCore.Version);

_valueTaskSourceCore.Reset();

if (_startTimestamp == -1)
break;

var timestamp = Stopwatch.GetTimestamp();
var duration = timestamp - _startTimestamp;

_histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration));

Volatile.Write(ref _startTimestamp, 0);
}
}
}

public bool GetResult(short token)
{
return _valueTaskSourceCore.GetResult(token);
}

public ValueTaskSourceStatus GetStatus(short token)
{
return _valueTaskSourceCore.GetStatus(token);
}

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_valueTaskSourceCore.OnCompleted(continuation, state, token, flags);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Disruptor.Util;
using HdrHistogram;

namespace Disruptor.PerfTests.Latency.OneWay;

public class OneWayAwaitLatencyTest_QueueUserWorkItem : ILatencyTest, IExternalTest
{
private const long _iterations = 1_000_000;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private readonly ManualResetEventSlim _completed = new();
private readonly Waiter[] _waiters;

public OneWayAwaitLatencyTest_QueueUserWorkItem()
{
_waiters = Enumerable.Range(0, 100).Select(x => new Waiter(_completed)).ToArray();
}

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();

for (var i = 0; i < _iterations; i++)
{
var now = Stopwatch.GetTimestamp();
while (now < next)
{
Thread.Yield();
now = Stopwatch.GetTimestamp();
}

var waiter = _waiters[i % _waiters.Length];
waiter.Notify(now);

next = now + pause;
}

// Ensure last iteration was processed
var lastWaiter = _waiters[(_iterations - 1) % _waiters.Length];
lastWaiter.Notify(-1);

_completed.Wait();

stopwatch.Stop();
}

private class Waiter : IThreadPoolWorkItem
{
private readonly ManualResetEventSlim _completed;
private HistogramBase _histogram;
private long _startTimestamp;

public Waiter(ManualResetEventSlim completed)
{
_completed = completed;
}

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
}

public void Notify(long timestamp)
{
while (Volatile.Read(ref _startTimestamp) != 0)
{
Thread.Sleep(0);
}

Volatile.Write(ref _startTimestamp, timestamp);
ThreadPool.UnsafeQueueUserWorkItem(this, true);
}

public void Execute()
{
if (_startTimestamp == -1)
{
_completed.Set();
return;
}

var timestamp = Stopwatch.GetTimestamp();
var duration = timestamp - _startTimestamp;

_histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration));

Volatile.Write(ref _startTimestamp, 0);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor.Util;
using HdrHistogram;

namespace Disruptor.PerfTests.Latency.OneWay;

public class OneWayAwaitLatencyTest_TaskCompletionSource : ILatencyTest, IExternalTest
{
private const long _iterations = 1_000_000;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private readonly ManualResetEventSlim _completed = new();
private readonly Waiter[] _waiters;

public OneWayAwaitLatencyTest_TaskCompletionSource()
{
_waiters = Enumerable.Range(0, 100).Select(x => new Waiter()).ToArray();
}

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
waiter.Start();
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();

for (int i = 0; i < _iterations; i++)
{
var now = Stopwatch.GetTimestamp();
while (now < next)
{
Thread.Yield();
now = Stopwatch.GetTimestamp();
}

var waiter = _waiters[i % _waiters.Length];
waiter.Notify(now);

next = now + pause;
}

foreach (var waiter in _waiters)
{
waiter.Notify(-1);
}

foreach (var waiter in _waiters)
{
waiter.Task.Wait();
}

stopwatch.Stop();
}

private class Waiter
{
private TaskCompletionSource<bool> _taskCompletionSource;
private HistogramBase _histogram;
private long _startTimestamp;

public Task Task { get; private set; }

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
_taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
Volatile.Write(ref _startTimestamp, 0);
}

public void Notify(long timestamp)
{
while (Volatile.Read(ref _startTimestamp) != 0)
{
Thread.Sleep(0);
}

Volatile.Write(ref _startTimestamp, timestamp);
_taskCompletionSource.SetResult(true);
}

public void Start()
{
Task = RunImpl();

async Task RunImpl()
{
while (true)
{
await _taskCompletionSource.Task.ConfigureAwait(false);

if (_startTimestamp == -1)
break;

var timestamp = Stopwatch.GetTimestamp();
var duration = timestamp - _startTimestamp;

_histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration));

_taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
Volatile.Write(ref _startTimestamp, 0);
}
}
}
}
}
Loading

0 comments on commit ceb4ee1

Please sign in to comment.