diff --git a/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs b/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs
index 0f00549c7b9..ac34bad6abb 100644
--- a/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs
+++ b/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs
@@ -5,25 +5,26 @@
//
//-----------------------------------------------------------------------
+using System;
using System.Collections.Generic;
namespace Akka.TestKit.Internal
{
///
/// This class represents an implementation of
- /// that uses a as its backing store.
+ /// that uses a as its backing store.
/// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.
///
/// The type of item to store.
public class BlockingCollectionTestActorQueue : ITestActorQueue
{
- private readonly BlockingQueue _queue;
+ private readonly ITestQueue _queue;
///
/// Initializes a new instance of the class.
///
/// The queue to use as the backing store.
- public BlockingCollectionTestActorQueue(BlockingQueue queue)
+ public BlockingCollectionTestActorQueue(ITestQueue queue)
{
_queue = queue;
}
diff --git a/src/core/Akka.TestKit/Internal/BlockingQueue.cs b/src/core/Akka.TestKit/Internal/BlockingQueue.cs
index d48c2fe8192..d775ae6cccb 100644
--- a/src/core/Akka.TestKit/Internal/BlockingQueue.cs
+++ b/src/core/Akka.TestKit/Internal/BlockingQueue.cs
@@ -11,6 +11,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
+using System.Threading.Tasks;
namespace Akka.TestKit.Internal
{
@@ -20,29 +21,26 @@ namespace Akka.TestKit.Internal
/// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.
///
/// The type of item to store.
- public class BlockingQueue
+ public class BlockingQueue : ITestQueue
{
private readonly BlockingCollection _collection = new BlockingCollection(new QueueWithAddFirst());
- ///
- /// The number of items that are currently in the queue.
- ///
+ ///
public int Count { get { return _collection.Count; } }
- ///
- /// Adds the specified item to the end of the queue.
- ///
- /// The item to add to the queue.
+ ///
public void Enqueue(T item)
{
if (!_collection.TryAdd(new Positioned(item)))
throw new InvalidOperationException("Failed to enqueue item into the queue.");
}
- ///
- /// Adds the specified item to the front of the queue.
- ///
- /// The item to add to the queue.
+ ///
+ public async ValueTask EnqueueAsync(T item)
+ {
+ Enqueue(item);
+ }
+
[Obsolete("This method will be removed from the public API in the future")]
public void AddFirst(T item)
{
@@ -50,24 +48,19 @@ public void AddFirst(T item)
throw new InvalidOperationException("Failed to enqueue item into the head of the queue.");
}
- ///
- /// Tries to add the specified item to the end of the queue within the specified time period.
- /// A token can be provided to cancel the operation if needed.
- ///
- /// The item to add to the queue.
- /// The number of milliseconds to wait for the add to complete.
- /// The cancellation token that can be used to cancel the operation.
- /// true if the add completed within the specified timeout; otherwise, false.
+ ///
public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
return _collection.TryAdd(new Positioned(item), millisecondsTimeout, cancellationToken);
}
- ///
- /// Tries to remove the specified item from the queue.
- ///
- /// The item to remove from the queue.
- /// true if the item was removed; otherwise, false.
+ ///
+ public async ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken)
+ {
+ return TryEnqueue(item, millisecondsTimeout, cancellationToken);
+ }
+
+ ///
public bool TryTake(out T item)
{
if(_collection.TryTake(out var p))
@@ -79,14 +72,14 @@ public bool TryTake(out T item)
return false;
}
- ///
- /// Tries to remove the specified item from the queue within the specified time period.
- /// A token can be provided to cancel the operation if needed.
- ///
- /// The item to remove from the queue.
- /// The number of milliseconds to wait for the remove to complete.
- /// The cancellation token that can be used to cancel the operation.
- /// true if the remove completed within the specified timeout; otherwise, false.
+ ///
+ public async ValueTask<(bool success, T item)> TryTakeAsync()
+ {
+ var result = TryTake(out var item);
+ return (result, item);
+ }
+
+ ///
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
@@ -98,27 +91,29 @@ public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cance
return false;
}
- ///
- /// Removes an item from the collection.
- ///
- /// The cancellation token that can be used to cancel the operation.
- ///
- /// This exception is thrown when the operation is canceled.
- ///
- /// The item removed from the collection.
+ ///
+ public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken)
+ {
+ var result = TryTake(out var item, millisecondsTimeout, cancellationToken);
+ return (result, item);
+ }
+
+ ///
public T Take(CancellationToken cancellationToken)
{
var p = _collection.Take(cancellationToken);
return p.Value;
}
+ ///
+ public async ValueTask TakeAsync(CancellationToken cancellationToken)
+ {
+ return _collection.Take(cancellationToken).Value;
+ }
+
#region Peek methods
- ///
- /// Tries to remove the specified item from the queue.
- ///
- /// The item to remove from the queue.
- /// true if the item was removed; otherwise, false.
+ ///
public bool TryPeek(out T item)
{
if(_collection.TryTake(out var p))
@@ -131,14 +126,19 @@ public bool TryPeek(out T item)
return false;
}
- ///
- /// Tries to remove the specified item from the queue within the specified time period.
- /// A token can be provided to cancel the operation if needed.
- ///
- /// The item to remove from the queue.
- /// The number of milliseconds to wait for the remove to complete.
- /// The cancellation token that can be used to cancel the operation.
- /// true if the remove completed within the specified timeout; otherwise, false.
+ ///
+ public async ValueTask<(bool success, T item)> TryPeekAsync()
+ {
+ if(_collection.TryTake(out var p))
+ {
+ var item = p.Value;
+ AddFirst(item);
+ return (true, item);
+ }
+ return (false, default);
+ }
+
+ ///
public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
@@ -151,14 +151,19 @@ public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cance
return false;
}
- ///
- /// Removes an item from the collection.
- ///
- /// The cancellation token that can be used to cancel the operation.
- ///
- /// This exception is thrown when the operation is canceled.
- ///
- /// The item removed from the collection.
+ ///
+ public async ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken)
+ {
+ if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
+ {
+ var item = p.Value;
+ AddFirst(item);
+ return (true, item);
+ }
+ return (false, default);
+ }
+
+ ///
public T Peek(CancellationToken cancellationToken)
{
var p = _collection.Take(cancellationToken);
@@ -166,12 +171,16 @@ public T Peek(CancellationToken cancellationToken)
return p.Value;
}
+ ///
+ public async ValueTask PeekAsync(CancellationToken cancellationToken)
+ {
+ var val = _collection.Take(cancellationToken).Value;
+ AddFirst(val);
+ return val;
+ }
#endregion
- ///
- /// Copies the items from the instance into a new .
- ///
- /// A containing copies of the elements of the collection
+ ///
public List ToList()
{
var positionArray = _collection.ToArray();
diff --git a/src/core/Akka.TestKit/Internal/ITestActorQueue.cs b/src/core/Akka.TestKit/Internal/ITestActorQueue.cs
index 2327fc82227..9fc3b2dafbc 100644
--- a/src/core/Akka.TestKit/Internal/ITestActorQueue.cs
+++ b/src/core/Akka.TestKit/Internal/ITestActorQueue.cs
@@ -5,6 +5,7 @@
//
//-----------------------------------------------------------------------
+using System;
using System.Collections.Generic;
namespace Akka.TestKit.Internal
@@ -30,6 +31,7 @@ public interface ITestActorQueue : ITestActorQueueProducer
/// Copies all the items from the instance into a new
///
/// TBD
+ [Obsolete("This method will be removed in the future")]
List ToList();
///
diff --git a/src/core/Akka.TestKit/Internal/ITestQueue.cs b/src/core/Akka.TestKit/Internal/ITestQueue.cs
new file mode 100644
index 00000000000..8491f39fb54
--- /dev/null
+++ b/src/core/Akka.TestKit/Internal/ITestQueue.cs
@@ -0,0 +1,165 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2022 Lightbend Inc.
+// // Copyright (C) 2013-2022 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Akka.TestKit.Internal
+{
+ public interface ITestQueue
+ {
+ ///
+ /// The number of items that are currently in the queue.
+ ///
+ int Count { get; }
+
+ ///
+ /// Adds the specified item to the end of the queue.
+ ///
+ /// The item to add to the queue.
+ void Enqueue(T item);
+
+ ///
+ /// Adds the specified item to the end of the queue.
+ ///
+ /// The item to add to the queue.
+ ValueTask EnqueueAsync(T item);
+
+ ///
+ /// Tries to add the specified item to the end of the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The item to add to the queue.
+ /// The number of milliseconds to wait for the add to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// true if the add completed within the specified timeout; otherwise, false.
+ bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Tries to add the specified item to the end of the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The item to add to the queue.
+ /// The number of milliseconds to wait for the add to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// true if the add completed within the specified timeout; otherwise, false.
+ ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Tries to remove the specified item from the queue.
+ ///
+ /// The item to remove from the queue.
+ /// true if the item was removed; otherwise, false.
+ bool TryTake(out T item);
+
+ ///
+ /// Tries to remove the specified item from the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The item to remove from the queue.
+ /// The number of milliseconds to wait for the remove to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// true if the remove completed within the specified timeout; otherwise, false.
+ bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Tries to remove the specified item from the queue.
+ ///
+ /// a tuple of bool and T, true if the item was removed; otherwise, false.
+ ValueTask<(bool success, T item)> TryTakeAsync();
+
+ ///
+ /// Tries to remove the specified item from the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The number of milliseconds to wait for the remove to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// a tuple of bool and T, true if the remove completed within the specified timeout; otherwise, false.
+ ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Removes an item from the collection.
+ ///
+ /// The cancellation token that can be used to cancel the operation.
+ ///
+ /// This exception is thrown when the operation is canceled.
+ ///
+ /// The item removed from the collection.
+ T Take(CancellationToken cancellationToken);
+
+ ///
+ /// Removes an item from the collection.
+ ///
+ /// The cancellation token that can be used to cancel the operation.
+ ///
+ /// This exception is thrown when the operation is canceled.
+ ///
+ /// The item removed from the collection.
+ ValueTask TakeAsync(CancellationToken cancellationToken);
+
+ ///
+ /// Tries to peek the specified item from the queue.
+ ///
+ /// The item to remove from the queue.
+ /// true if the item was removed; otherwise, false.
+ bool TryPeek(out T item);
+
+ ///
+ /// Tries to peek the specified item from the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The item to remove from the queue.
+ /// The number of milliseconds to wait for the remove to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// true if the remove completed within the specified timeout; otherwise, false.
+ bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Tries to peek the specified item from the queue.
+ ///
+ /// a tuple of bool and T, true if the item was removed; otherwise, false.
+ ValueTask<(bool success, T item)> TryPeekAsync();
+
+ ///
+ /// Tries to peek the specified item from the queue within the specified time period.
+ /// A token can be provided to cancel the operation if needed.
+ ///
+ /// The number of milliseconds to wait for the remove to complete.
+ /// The cancellation token that can be used to cancel the operation.
+ /// a tuple of bool and T, true if the remove completed within the specified timeout; otherwise, false.
+ ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken);
+
+ ///
+ /// Peek an item from the collection.
+ ///
+ /// The cancellation token that can be used to cancel the operation.
+ ///
+ /// This exception is thrown when the operation is canceled.
+ ///
+ /// The item removed from the collection.
+ T Peek(CancellationToken cancellationToken);
+
+ ///
+ /// Peek an item from the collection.
+ ///
+ /// The cancellation token that can be used to cancel the operation.
+ ///
+ /// This exception is thrown when the operation is canceled.
+ ///
+ /// The item removed from the collection.
+ ValueTask PeekAsync(CancellationToken cancellationToken);
+
+ ///
+ /// Copies the items from the instance into a new .
+ ///
+ /// A containing copies of the elements of the collection
+ [Obsolete("This method will be removed in the future")]
+ List ToList();
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs
index b7a43dd9128..44d11943419 100644
--- a/src/core/Akka.TestKit/TestKitBase.cs
+++ b/src/core/Akka.TestKit/TestKitBase.cs
@@ -34,7 +34,7 @@ public TestState()
public ActorSystem System { get; set; }
public TestKitSettings TestKitSettings { get; set; }
- public BlockingQueue Queue { get; set; }
+ public ITestQueue Queue { get; set; }
public MessageEnvelope LastMessage { get; set; }
public IActorRef TestActor { get; set; }
public TimeSpan? End { get; set; }