Skip to content

Commit

Permalink
Extract ITestQueue<T> interface from BlockingQueue<T> (#5665)
Browse files Browse the repository at this point in the history
Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
Arkatufus and Greg-Petabridge authored Feb 17, 2022
1 parent 22315d7 commit ce274b0
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Akka.TestKit.Internal
{
/// <summary>
/// This class represents an implementation of <see cref="ITestActorQueue{T}"/>
/// that uses a <see cref="BlockingQueue{T}"/> as its backing store.
/// that uses a <see cref="ITestQueue{T}"/> as its backing store.
/// <remarks>Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.</remarks>
/// </summary>
/// <typeparam name="T">The type of item to store.</typeparam>
public class BlockingCollectionTestActorQueue<T> : ITestActorQueue<T>
{
private readonly BlockingQueue<T> _queue;
private readonly ITestQueue<T> _queue;

/// <summary>
/// Initializes a new instance of the <see cref="BlockingCollectionTestActorQueue{T}"/> class.
/// </summary>
/// <param name="queue">The queue to use as the backing store.</param>
public BlockingCollectionTestActorQueue(BlockingQueue<T> queue)
public BlockingCollectionTestActorQueue(ITestQueue<T> queue)
{
_queue = queue;
}
Expand Down
141 changes: 75 additions & 66 deletions src/core/Akka.TestKit/Internal/BlockingQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.TestKit.Internal
{
Expand All @@ -20,54 +21,46 @@ namespace Akka.TestKit.Internal
/// <remarks>Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.</remarks>
/// </summary>
/// <typeparam name="T">The type of item to store.</typeparam>
public class BlockingQueue<T>
public class BlockingQueue<T> : ITestQueue<T>
{
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>(new QueueWithAddFirst());

/// <summary>
/// The number of items that are currently in the queue.
/// </summary>
/// <inheritdoc cref="Count"/>
public int Count { get { return _collection.Count; } }

/// <summary>
/// Adds the specified item to the end of the queue.
/// </summary>
/// <param name="item">The item to add to the queue.</param>
/// <inheritdoc cref="Enqueue"/>
public void Enqueue(T item)
{
if (!_collection.TryAdd(new Positioned(item)))
throw new InvalidOperationException("Failed to enqueue item into the queue.");
}

/// <summary>
/// Adds the specified item to the front of the queue.
/// </summary>
/// <param name="item">The item to add to the queue.</param>
/// <inheritdoc cref="EnqueueAsync"/>
public async ValueTask EnqueueAsync(T item)
{
Enqueue(item);
}

[Obsolete("This method will be removed from the public API in the future")]
public void AddFirst(T item)
{
if(!_collection.TryAdd(new Positioned(item, first:true)))
throw new InvalidOperationException("Failed to enqueue item into the head of the queue.");
}

/// <summary>
/// Tries to add the specified item to the end of the queue within the specified time period.
/// A token can be provided to cancel the operation if needed.
/// </summary>
/// <param name="item">The item to add to the queue.</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the add to complete.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <returns><c>true</c> if the add completed within the specified timeout; otherwise, <c>false</c>.</returns>
/// <inheritdoc cref="TryEnqueue"/>
public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
return _collection.TryAdd(new Positioned(item), millisecondsTimeout, cancellationToken);
}

/// <summary>
/// Tries to remove the specified item from the queue.
/// </summary>
/// <param name="item">The item to remove from the queue.</param>
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
/// <inheritdoc cref="TryEnqueueAsync"/>
public async ValueTask<bool> TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
return TryEnqueue(item, millisecondsTimeout, cancellationToken);
}

/// <inheritdoc cref="TryTake(out T)"/>
public bool TryTake(out T item)
{
if(_collection.TryTake(out var p))
Expand All @@ -79,14 +72,14 @@ public bool TryTake(out T item)
return false;
}

/// <summary>
/// Tries to remove the specified item from the queue within the specified time period.
/// A token can be provided to cancel the operation if needed.
/// </summary>
/// <param name="item">The item to remove from the queue.</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the remove to complete.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
/// <inheritdoc cref="TryTakeAsync()"/>
public async ValueTask<(bool success, T item)> TryTakeAsync()
{
var result = TryTake(out var item);
return (result, item);
}

/// <inheritdoc cref="TryTake(out T, int, CancellationToken)"/>
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
Expand All @@ -98,27 +91,29 @@ public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cance
return false;
}

/// <summary>
/// Removes an item from the collection.
/// </summary>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <exception cref="OperationCanceledException">
/// This exception is thrown when the operation is canceled.
/// </exception>
/// <returns>The item removed from the collection.</returns>
/// <inheritdoc cref="TryTakeAsync(int, CancellationToken)"/>
public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken)
{
var result = TryTake(out var item, millisecondsTimeout, cancellationToken);
return (result, item);
}

/// <inheritdoc cref="Take"/>
public T Take(CancellationToken cancellationToken)
{
var p = _collection.Take(cancellationToken);
return p.Value;
}

/// <inheritdoc cref="TakeAsync"/>
public async ValueTask<T> TakeAsync(CancellationToken cancellationToken)
{
return _collection.Take(cancellationToken).Value;
}

#region Peek methods

/// <summary>
/// Tries to remove the specified item from the queue.
/// </summary>
/// <param name="item">The item to remove from the queue.</param>
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
/// <inheritdoc cref="TryPeek(out T)"/>
public bool TryPeek(out T item)
{
if(_collection.TryTake(out var p))
Expand All @@ -131,14 +126,19 @@ public bool TryPeek(out T item)
return false;
}

/// <summary>
/// Tries to remove the specified item from the queue within the specified time period.
/// A token can be provided to cancel the operation if needed.
/// </summary>
/// <param name="item">The item to remove from the queue.</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the remove to complete.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
/// <inheritdoc cref="TryPeekAsync()"/>
public async ValueTask<(bool success, T item)> TryPeekAsync()
{
if(_collection.TryTake(out var p))
{
var item = p.Value;
AddFirst(item);
return (true, item);
}
return (false, default);
}

/// <inheritdoc cref="TryPeek(out T, int, CancellationToken)"/>
public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
Expand All @@ -151,27 +151,36 @@ public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cance
return false;
}

/// <summary>
/// Removes an item from the collection.
/// </summary>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <exception cref="OperationCanceledException">
/// This exception is thrown when the operation is canceled.
/// </exception>
/// <returns>The item removed from the collection.</returns>
/// <inheritdoc cref="TryPeekAsync(int, CancellationToken)"/>
public async ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken)
{
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
{
var item = p.Value;
AddFirst(item);
return (true, item);
}
return (false, default);
}

/// <inheritdoc cref="Peek"/>
public T Peek(CancellationToken cancellationToken)
{
var p = _collection.Take(cancellationToken);
AddFirst(p.Value);
return p.Value;
}

/// <inheritdoc cref="PeekAsync"/>
public async ValueTask<T> PeekAsync(CancellationToken cancellationToken)
{
var val = _collection.Take(cancellationToken).Value;
AddFirst(val);
return val;
}
#endregion

/// <summary>
/// Copies the items from the <see cref="BlockingQueue{T}"/> instance into a new <see cref="List{T}"/>.
/// </summary>
/// <returns>A <see cref="List{T}"/> containing copies of the elements of the collection</returns>
/// <inheritdoc cref="ToList"/>
public List<T> ToList()
{
var positionArray = _collection.ToArray();
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.TestKit/Internal/ITestActorQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Akka.TestKit.Internal
Expand All @@ -30,6 +31,7 @@ public interface ITestActorQueue<T> : ITestActorQueueProducer<T>
/// Copies all the items from the <see cref="ITestActorQueue{T}"/> instance into a new <see cref="List{T}"/>
/// </summary>
/// <returns>TBD</returns>
[Obsolete("This method will be removed in the future")]
List<T> ToList();

/// <summary>
Expand Down
Loading

0 comments on commit ce274b0

Please sign in to comment.