Skip to content

Commit

Permalink
Change Receive Test Methods to Sync over Async (#5678)
Browse files Browse the repository at this point in the history
* Changed to sync over async

* * Change Peek methods to sync over async
* Create Peek `async` mthods

* Change FishForMessage() to sync over async that calls FishForMessageAsync()

* Inherit doc from `FishForMessage`

* Fix .Wait() returns AggregatedException instead of expected exxception

Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
eaba and Greg-Petabridge authored Feb 22, 2022
1 parent 22ec797 commit b9b8849
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/core/Akka.TestKit/Internal/AsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx.Synchronous;

namespace Akka.TestKit.Internal
{
Expand All @@ -19,7 +20,7 @@ public class AsyncQueue<T>: ITestQueue<T> where T: class

public int Count => _collection.Count;

public void Enqueue(T item) => EnqueueAsync(item).AsTask().Wait();
public void Enqueue(T item) => EnqueueAsync(item).AsTask().WaitAndUnwrapException();

public ValueTask EnqueueAsync(T item) => new ValueTask(_collection.AddAsync(item));

Expand Down
100 changes: 87 additions & 13 deletions src/core/Akka.TestKit/TestKitBase_Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.TestKit.Internal;
using Nito.AsyncEx.Synchronous;

namespace Akka.TestKit
{
Expand Down Expand Up @@ -58,14 +59,22 @@ public T FishForMessage<T>(Predicate<T> isMessage, TimeSpan? max = null, string
/// <param name="allMessages">If null then will be ignored. If not null then will be initially cleared, then filled with all the messages until <paramref name="isMessage"/> returns <c>true</c></param>
/// <returns>Returns the message that <paramref name="isMessage"/> matched</returns>
public T FishForMessage<T>(Predicate<T> isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "")
{
var task = FishForMessageAsync<T>(isMessage, allMessages, max, hint).AsTask();
task.WaitAndUnwrapException();
return task.Result;
}

/// <inheritdoc cref="FishForMessage{T}(Predicate{T}, ArrayList, TimeSpan?, string)"/>
public async ValueTask<T> FishForMessageAsync<T>(Predicate<T> isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "")
{
var maxValue = RemainingOrDilated(max);
var end = Now + maxValue;
allMessages?.Clear();
while (true)
{
var left = end - Now;
var msg = ReceiveOne(left);
var msg = await ReceiveOneAsync(left).ConfigureAwait(false);
_assertions.AssertTrue(msg != null, "Timeout ({0}) during fishForMessage{1}", maxValue, string.IsNullOrEmpty(hint) ? "" : ", hint: " + hint);
if (msg is T msg1 && isMessage(msg1))
{
Expand Down Expand Up @@ -138,9 +147,20 @@ public async Task<ArrayList> WaitForRadioSilenceAsync(TimeSpan? max = null, uint
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object ReceiveOne(TimeSpan? max = null)
{
MessageEnvelope envelope;
if (TryReceiveOne(out envelope, max, CancellationToken.None))
return envelope.Message;
var task = ReceiveOneAsync(max).AsTask();
task.WaitAndUnwrapException();
var received = task.Result;
return received;
}

/// <inheritdoc cref="ReceiveOne(TimeSpan?)"/>
public async ValueTask<object> ReceiveOneAsync(TimeSpan? max = null)
{
var received = await TryReceiveOneAsync(max, CancellationToken.None);

if (received.success)
return received.envelope.Message;

return null;
}

Expand All @@ -152,9 +172,19 @@ public object ReceiveOne(TimeSpan? max = null)
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object ReceiveOne(CancellationToken cancellationToken)
{
MessageEnvelope envelope;
if (TryReceiveOne(out envelope, Timeout.InfiniteTimeSpan, cancellationToken))
return envelope.Message;
var task = ReceiveOneAsync(cancellationToken).AsTask();
task.WaitAndUnwrapException();
var received = task.Result;
return received;
}
/// <inheritdoc cref="ReceiveOne(CancellationToken)"/>
public async ValueTask<object> ReceiveOneAsync(CancellationToken cancellationToken)
{
var received = await TryReceiveOneAsync(Timeout.InfiniteTimeSpan, cancellationToken);

if (received.success)
return received.envelope.Message;

return null;
}

Expand All @@ -177,6 +207,12 @@ public bool TryReceiveOne(out MessageEnvelope envelope, TimeSpan? max = null)
return TryReceiveOne(out envelope, max, CancellationToken.None);
}

/// <inheritdoc cref="TryReceiveOne(out MessageEnvelope, TimeSpan?)"/>
public async ValueTask<(bool success, MessageEnvelope envelope)> TryReceiveOneAsync(TimeSpan? max = null)
{
return await TryReceiveOneAsync(max, CancellationToken.None).ConfigureAwait(false);
}

/// <summary>
/// Receive one message from the internal queue of the TestActor within
/// the specified duration.
Expand All @@ -200,10 +236,16 @@ public bool TryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, Cancellat
return InternalTryReceiveOne(out envelope, max, cancellationToken, true);
}

/// <inheritdoc cref="TryReceiveOne(out MessageEnvelope, TimeSpan?, CancellationToken)"/>
public async ValueTask<(bool success, MessageEnvelope envelope)> TryReceiveOneAsync(TimeSpan? max, CancellationToken cancellationToken)
{
return await InternalTryReceiveOneAsync(max, cancellationToken, true).ConfigureAwait(false);
}

private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
var task = InternalTryReceiveOneAsync(max, cancellationToken, shouldLog).AsTask();
task.Wait();
task.WaitAndUnwrapException();
var received = task.Result;
envelope = received.envelope;
return received.success;
Expand Down Expand Up @@ -268,8 +310,18 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max,
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object PeekOne(TimeSpan? max = null)
{
if (InternalTryPeekOne(out var envelope, max, CancellationToken.None, true))
return envelope.Message;
var task = PeekOneAsync(max).AsTask();
task.WaitAndUnwrapException();
var peeked = task.Result;
return peeked;
}

/// <inheritdoc cref="PeekOne(TimeSpan?)"/>
public async ValueTask<object> PeekOneAsync(TimeSpan? max = null)
{
var peeked = await TryPeekOneAsync(max, CancellationToken.None);
if (peeked.success)
return peeked.envelope.Message;
return null;
}

Expand All @@ -281,8 +333,18 @@ public object PeekOne(TimeSpan? max = null)
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
public object PeekOne(CancellationToken cancellationToken)
{
if (InternalTryPeekOne(out var envelope, Timeout.InfiniteTimeSpan, cancellationToken, true))
return envelope.Message;
var task = PeekOneAsync(cancellationToken).AsTask();
task.WaitAndUnwrapException();
var peeked = task.Result;
return peeked;
}

/// <inheritdoc cref="PeekOne(CancellationToken)"/>
public async ValueTask<object> PeekOneAsync(CancellationToken cancellationToken)
{
var peeked = await TryPeekOneAsync(Timeout.InfiniteTimeSpan, cancellationToken);
if (peeked.success)
return peeked.envelope.Message;
return null;
}

Expand All @@ -304,6 +366,12 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max = null)
return InternalTryPeekOne(out envelope, max, CancellationToken.None, true);
}

/// <inheritdoc cref="TryPeekOne(out MessageEnvelope, TimeSpan?)"/>
public async ValueTask<(bool success, MessageEnvelope envelope)> TryPeekOneAsync(TimeSpan? max = null)
{
return await InternalTryPeekOneAsync(max, CancellationToken.None, true);
}

/// <summary>
/// Peek one message from the head of the internal queue of the TestActor within
/// the specified duration.
Expand All @@ -327,10 +395,16 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Cancellation
return InternalTryPeekOne(out envelope, max, cancellationToken, true);
}

/// <inheritdoc cref="TryPeekOne(out MessageEnvelope, TimeSpan?, CancellationToken)"/>
public async ValueTask<(bool success, MessageEnvelope envelope)> TryPeekOneAsync(TimeSpan? max, CancellationToken cancellationToken)
{
return await InternalTryPeekOneAsync(max, cancellationToken, true);
}

private bool InternalTryPeekOne(out MessageEnvelope envelope, TimeSpan? max, CancellationToken cancellationToken, bool shouldLog)
{
var task = InternalTryPeekOneAsync(max, cancellationToken, shouldLog).AsTask();
task.Wait();
task.WaitAndUnwrapException();
var received = task.Result;
envelope = received.envelope;
return received.success;
Expand Down

0 comments on commit b9b8849

Please sign in to comment.