From 2057baefe97cf0867a4637676e10f562334de71c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 12 Apr 2024 09:09:37 -0500 Subject: [PATCH] Akka.TestKit: deleted `IAsyncQueue`; replaced with `System.Threading.Channel` (#7157) * deleted `IAsyncQueue`; replaced with System.Threading.Channel * added reproduction for #7145 * fixed issues with `ExpectNoMsgAsync` and `ExpectNoMsg` * fixed issues with `TryPeekAsync` * added API approvals * removed contention spec * harden spec * fix warning --------- Co-authored-by: Gregorius Soedharmo --- ...APISpec.ApproveTestKit.DotNet.verified.txt | 127 ------ ...oreAPISpec.ApproveTestKit.Net.verified.txt | 127 ------ .../Dsl/FlowSelectAsyncSpec.cs | 21 +- .../Dsl/FlowSelectAsyncUnorderedSpec.cs | 20 +- src/core/Akka.TestKit.Tests/Bugfix7145Spec.cs | 55 +++ .../Internal/AsyncPeekableCollection.cs | 398 ------------------ src/core/Akka.TestKit/Internal/AsyncQueue.cs | 274 ------------ .../BlockingCollectionTestActorQueue.cs | 67 --- .../Akka.TestKit/Internal/BlockingQueue.cs | 288 ------------- .../Akka.TestKit/Internal/ITestActorQueue.cs | 43 -- src/core/Akka.TestKit/Internal/ITestQueue.cs | 168 -------- .../Internal/InternalTestActor.cs | 32 +- src/core/Akka.TestKit/TestKitBase.cs | 15 +- src/core/Akka.TestKit/TestKitBase_Receive.cs | 58 ++- ...HashedWheelTimerSchedulerContentionSpec.cs | 142 ------- .../Actor/Scheduler/SchedulerShutdownSpec.cs | 4 +- 16 files changed, 153 insertions(+), 1686 deletions(-) create mode 100644 src/core/Akka.TestKit.Tests/Bugfix7145Spec.cs delete mode 100644 src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs delete mode 100644 src/core/Akka.TestKit/Internal/AsyncQueue.cs delete mode 100644 src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs delete mode 100644 src/core/Akka.TestKit/Internal/BlockingQueue.cs delete mode 100644 src/core/Akka.TestKit/Internal/ITestActorQueue.cs delete mode 100644 src/core/Akka.TestKit/Internal/ITestQueue.cs delete mode 100644 src/core/Akka.Tests/Actor/Scheduler/HashedWheelTimerSchedulerContentionSpec.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt index b42887e3c70..55cff536c32 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.DotNet.verified.txt @@ -622,84 +622,6 @@ namespace Akka.TestKit.Extensions } namespace Akka.TestKit.Internal { - public class AsyncQueue : Akka.TestKit.Internal.ITestQueue - where T : class - { - public AsyncQueue() { } - public int Count { get; } - public void Enqueue(T item) { } - public System.Threading.Tasks.ValueTask EnqueueAsync(T item) { } - public T Peek(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken) { } - public T Take(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken) { } - public System.Collections.Generic.List ToList() { } - public bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryPeek(out T item) { } - public bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null) { } - public bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - } - public class BlockingCollectionTestActorQueue : Akka.TestKit.Internal.ITestActorQueueProducer, Akka.TestKit.Internal.ITestActorQueue - { - public BlockingCollectionTestActorQueue(Akka.TestKit.Internal.ITestQueue queue) { } - public void Enqueue(T item) { } - public System.Collections.Generic.IEnumerable GetAll() { } - public System.Collections.Generic.List ToList() { } - } - public class BlockingQueue : Akka.TestKit.Internal.ITestQueue - { - public BlockingQueue() { } - public int Count { get; } - [System.ObsoleteAttribute("This method will be removed from the public API in the future")] - public void AddFirst(T item) { } - public void Enqueue(T item) { } - public System.Threading.Tasks.ValueTask EnqueueAsync(T item) { } - public T Peek(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken) { } - public T Take(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken) { } - public System.Collections.Generic.List ToList() { } - public bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryPeek(out T item) { } - public bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null) { } - public bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - } public class CustomEventFilter : Akka.TestKit.Internal.EventFilterBase { public CustomEventFilter(System.Predicate predicate) { } @@ -731,50 +653,6 @@ namespace Akka.TestKit.Internal public override string ToString() { } } public delegate void EventMatched(Akka.TestKit.Internal.EventFilterBase eventFilter, Akka.Event.LogEvent logEvent); - public interface ITestActorQueueProducer - { - void Enqueue(T item); - } - public interface ITestActorQueue : Akka.TestKit.Internal.ITestActorQueueProducer - { - System.Collections.Generic.IEnumerable GetAll(); - [System.ObsoleteAttribute("This method will be removed in the future")] - System.Collections.Generic.List ToList(); - } - public interface ITestQueue - { - int Count { get; } - void Enqueue(T item); - System.Threading.Tasks.ValueTask EnqueueAsync(T item); - T Peek(System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken); - T Take(System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken); - [System.ObsoleteAttribute("This method will be removed in the future")] - System.Collections.Generic.List ToList(); - bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - bool TryPeek(out T item); - bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null); - bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - } public class InfoFilter : Akka.TestKit.Internal.EventFilterBase { public InfoFilter(Akka.TestKit.Internal.StringMatcher.IStringMatcher messageMatcher = null, Akka.TestKit.Internal.StringMatcher.IStringMatcher sourceMatcher = null) { } @@ -831,11 +709,6 @@ namespace Akka.TestKit.Internal public virtual void HandleEvent(Akka.TestKit.Internal.EventFilterBase eventFilter, Akka.Event.LogEvent logEvent) { } } } - public class InternalTestActor : Akka.Actor.UntypedActor - { - public InternalTestActor(Akka.TestKit.Internal.ITestActorQueue queue) { } - protected override void OnReceive(object message) { } - } public class InternalTestActorRef : Akka.Actor.LocalActorRef { public object UnderlyingActor { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt index 8322c89ddeb..681fff75319 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveTestKit.Net.verified.txt @@ -622,84 +622,6 @@ namespace Akka.TestKit.Extensions } namespace Akka.TestKit.Internal { - public class AsyncQueue : Akka.TestKit.Internal.ITestQueue - where T : class - { - public AsyncQueue() { } - public int Count { get; } - public void Enqueue(T item) { } - public System.Threading.Tasks.ValueTask EnqueueAsync(T item) { } - public T Peek(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken) { } - public T Take(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken) { } - public System.Collections.Generic.List ToList() { } - public bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryPeek(out T item) { } - public bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null) { } - public bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - } - public class BlockingCollectionTestActorQueue : Akka.TestKit.Internal.ITestActorQueueProducer, Akka.TestKit.Internal.ITestActorQueue - { - public BlockingCollectionTestActorQueue(Akka.TestKit.Internal.ITestQueue queue) { } - public void Enqueue(T item) { } - public System.Collections.Generic.IEnumerable GetAll() { } - public System.Collections.Generic.List ToList() { } - } - public class BlockingQueue : Akka.TestKit.Internal.ITestQueue - { - public BlockingQueue() { } - public int Count { get; } - [System.ObsoleteAttribute("This method will be removed from the public API in the future")] - public void AddFirst(T item) { } - public void Enqueue(T item) { } - public System.Threading.Tasks.ValueTask EnqueueAsync(T item) { } - public T Peek(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken) { } - public T Take(System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken) { } - public System.Collections.Generic.List ToList() { } - public bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryPeek(out T item) { } - public bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - public bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null) { } - public bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken) { } - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - public System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) { } - } public class CustomEventFilter : Akka.TestKit.Internal.EventFilterBase { public CustomEventFilter(System.Predicate predicate) { } @@ -731,50 +653,6 @@ namespace Akka.TestKit.Internal public override string ToString() { } } public delegate void EventMatched(Akka.TestKit.Internal.EventFilterBase eventFilter, Akka.Event.LogEvent logEvent); - public interface ITestActorQueueProducer - { - void Enqueue(T item); - } - public interface ITestActorQueue : Akka.TestKit.Internal.ITestActorQueueProducer - { - System.Collections.Generic.IEnumerable GetAll(); - [System.ObsoleteAttribute("This method will be removed in the future")] - System.Collections.Generic.List ToList(); - } - public interface ITestQueue - { - int Count { get; } - void Enqueue(T item); - System.Threading.Tasks.ValueTask EnqueueAsync(T item); - T Peek(System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask PeekAsync(System.Threading.CancellationToken cancellationToken); - T Take(System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask TakeAsync(System.Threading.CancellationToken cancellationToken); - [System.ObsoleteAttribute("This method will be removed in the future")] - System.Collections.Generic.List ToList(); - bool TryEnqueue(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - System.Threading.Tasks.ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - bool TryPeek(out T item); - bool TryPeek(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryPeekAsync(System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryPeekAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - bool TryTake(out T item, System.Threading.CancellationToken cancellationToken = null); - bool TryTake(out T item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryTakeAsync(System.Threading.CancellationToken cancellationToken); - [return: System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { - "success", - "item"})] - System.Threading.Tasks.ValueTask> TryTakeAsync(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken); - } public class InfoFilter : Akka.TestKit.Internal.EventFilterBase { public InfoFilter(Akka.TestKit.Internal.StringMatcher.IStringMatcher messageMatcher = null, Akka.TestKit.Internal.StringMatcher.IStringMatcher sourceMatcher = null) { } @@ -831,11 +709,6 @@ namespace Akka.TestKit.Internal public virtual void HandleEvent(Akka.TestKit.Internal.EventFilterBase eventFilter, Akka.Event.LogEvent logEvent) { } } } - public class InternalTestActor : Akka.Actor.UntypedActor - { - public InternalTestActor(Akka.TestKit.Internal.ITestActorQueue queue) { } - protected override void OnReceive(object message) { } - } public class InternalTestActorRef : Akka.Actor.LocalActorRef { public object UnderlyingActor { get; } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 5bd7e42d746..353b6f6877d 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; @@ -406,10 +407,10 @@ await this.AssertAllStagesStoppedAsync(async() => { const int parallelism = 8; var counter = new AtomicCounter(); - var queue = new BlockingQueue<(TaskCompletionSource, long)>(); + var queue = Channel.CreateUnbounded<(TaskCompletionSource, long)>(); var cancellation = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => + Task.Run(async () => { var delay = 500; // 50000 nanoseconds var count = 0; @@ -418,7 +419,7 @@ await this.AssertAllStagesStoppedAsync(async() => { try { - var t = queue.Take(cancellation.Token); + var t = await queue.Reader.ReadAsync(cancellation.Token); var promise = t.Item1; var enqueued = t.Item2; var wakeup = enqueued + delay; @@ -435,22 +436,26 @@ await this.AssertAllStagesStoppedAsync(async() => }, cancellation.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Func> deferred = () => + Task Deferred() { var promise = new TaskCompletionSource(); if (counter.IncrementAndGet() > parallelism) promise.SetException(new Exception("parallelism exceeded")); else - - queue.Enqueue((promise, DateTime.Now.Ticks)); + { + var wrote = queue.Writer.TryWrite((promise, DateTime.Now.Ticks)); + if (!wrote) + promise.SetException(new Exception("Failed to write to queue")); + } + return promise.Task; - }; + } try { const int n = 10000; var task = Source.From(Enumerable.Range(1, n)) - .SelectAsync(parallelism, _ => deferred()) + .SelectAsync(parallelism, _ => Deferred()) .RunAggregate(0, (c, _) => c + 1, Materializer); var complete = await task.ShouldCompleteWithin(3.Seconds()); diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs index fb2fdcdc350..3ba82f0da66 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; @@ -335,10 +336,10 @@ public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_tha await this.AssertAllStagesStoppedAsync(() => { const int parallelism = 8; var counter = new AtomicCounter(); - var queue = new BlockingQueue<(TaskCompletionSource, long)>(); + var queue = Channel.CreateUnbounded<(TaskCompletionSource, long)>(); var cancellation = new CancellationTokenSource(); - Task.Run(() => + Task.Run(async () => { var delay = 500; // 50000 nanoseconds var count = 0; @@ -347,7 +348,7 @@ await this.AssertAllStagesStoppedAsync(() => { { try { - var t = queue.Take(cancellation.Token); + var t = await queue.Reader.ReadAsync(cancellation.Token); var promise = t.Item1; var enqueued = t.Item2; var wakeup = enqueued + delay; @@ -363,21 +364,26 @@ await this.AssertAllStagesStoppedAsync(() => { } }, cancellation.Token); - Func> deferred = () => + Task Deferred() { var promise = new TaskCompletionSource(); if (counter.IncrementAndGet() > parallelism) promise.SetException(new Exception("parallelism exceeded")); else - queue.Enqueue((promise, DateTime.Now.Ticks)); + { + var write = queue.Writer.TryWrite((promise, DateTime.Now.Ticks)); + if (!write) + promise.SetException(new Exception("Failed to write to queue")); + } + return promise.Task; - }; + } try { const int n = 10000; var task = Source.From(Enumerable.Range(1, n)) - .SelectAsyncUnordered(parallelism, _ => deferred()) + .SelectAsyncUnordered(parallelism, _ => Deferred()) .RunAggregate(0, (c, _) => c + 1, Materializer); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); diff --git a/src/core/Akka.TestKit.Tests/Bugfix7145Spec.cs b/src/core/Akka.TestKit.Tests/Bugfix7145Spec.cs new file mode 100644 index 00000000000..7fb2855fcb1 --- /dev/null +++ b/src/core/Akka.TestKit.Tests/Bugfix7145Spec.cs @@ -0,0 +1,55 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Threading.Tasks; +using Akka.Actor; +using FluentAssertions; +using Xunit; + +namespace Akka.TestKit.Tests; + +public class Bugfix7145Spec : AkkaSpec +{ + // generate a test actor that will receive a message inside a ReceiveAsync - it should then await briefly on a Task.Delay inside that ReceiveAsync and then send two different messages back to the Sender + private class BuggyActor : ReceiveActor + { + public BuggyActor() + { + ReceiveAsync(async s => + { + await Task.Delay(100); + Sender.Tell(s + "1"); + Sender.Tell(s + "2"); + }); + } + } + + [Fact] + public async Task Should_not_deadlock_when_using_ReceiveAsync() + { + var actor = Sys.ActorOf(Props.Create(() => new BuggyActor())); + var probe = CreateTestProbe(); + actor.Tell("hello", probe); + var response1 = await probe.ExpectMsgAsync(); + var response2 = await probe.ExpectMsgAsync(); + response1.Should().Be("hello1"); + response2.Should().Be("hello2"); + } + + // generate a test case where we set ConfigureAwait(false) on the ExpectMsgAsync calls inside the test method + [Fact] + public async Task Should_not_deadlock_when_using_ReceiveAsync_with_ConfigureAwait_false() + { + var actor = Sys.ActorOf(Props.Create(() => new BuggyActor())); + var probe = CreateTestProbe(); + actor.Tell("hello", probe); + var response1 = await probe.ExpectMsgAsync().ConfigureAwait(false); + var response2 = await probe.ExpectMsgAsync().ConfigureAwait(false); + response1.Should().Be("hello1"); + response2.Should().Be("hello2"); + } +} \ No newline at end of file diff --git a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs b/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs deleted file mode 100644 index d4cd154b31b..00000000000 --- a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs +++ /dev/null @@ -1,398 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using Nito.AsyncEx; -using Nito.AsyncEx.Synchronous; - -#nullable enable -namespace Akka.TestKit.Internal -{ - /// - /// An async-compatible producer/consumer collection. - /// - /// The type of elements contained in the collection. - [DebuggerDisplay("Count = {_collection.Count}, MaxCount = {_maxCount}")] - [DebuggerTypeProxy(typeof(AsyncPeekableCollection<>.DebugView))] - internal sealed class AsyncPeekableCollection where T: class - { - /// - /// The underlying collection. - /// - private readonly IPeekableProducerConsumerCollection _collection; - - /// - /// The maximum number of elements allowed in the collection. - /// - private readonly int _maxCount; - - /// - /// The mutual-exclusion lock protecting the collection. - /// - private readonly AsyncLock _mutex; - - /// - /// A condition variable that is signalled when the collection is completed or not full. - /// - private readonly AsyncConditionVariable _completedOrNotFull; - - /// - /// A condition variable that is signalled when the collection is completed or not empty. - /// - private readonly AsyncConditionVariable _completedOrNotEmpty; - - /// - /// Whether the collection has been marked completed for adding. - /// - private bool _completed; - - /// - /// Creates a new async-compatible producer/consumer collection wrapping the specified collection - /// and with a maximum element count. - /// - /// The collection to wrap. - /// The maximum element count. This must be greater than zero. - public AsyncPeekableCollection(IPeekableProducerConsumerCollection collection, int maxCount) - { - //collection ??= new ConcurrentQueue(); - if (maxCount <= 0) - throw new ArgumentOutOfRangeException(nameof(maxCount), "The maximum count must be greater than zero."); - if (maxCount < collection.Count) - throw new ArgumentException("The maximum count cannot be less than the number of elements in the collection.", nameof(maxCount)); - _collection = collection; - _maxCount = maxCount; - _mutex = new AsyncLock(); - _completedOrNotFull = new AsyncConditionVariable(_mutex); - _completedOrNotEmpty = new AsyncConditionVariable(_mutex); - } - - /// - /// Creates a new async-compatible producer/consumer collection wrapping the specified collection. - /// - /// The collection to wrap. - public AsyncPeekableCollection(IPeekableProducerConsumerCollection collection) - : this(collection, int.MaxValue) - { - } - - /// - /// Whether the collection is empty. - /// - private bool Empty => _collection.Count == 0; - - /// - /// Whether the collection is full. - /// - private bool Full => _collection.Count == _maxCount; - - public int Count => _collection.Count; - - /// - /// Synchronously marks the producer/consumer collection as complete for adding. - /// - public void CompleteAdding() - { - using (_mutex.Lock()) - { - _completed = true; - _completedOrNotEmpty.NotifyAll(); - _completedOrNotFull.NotifyAll(); - } - } - - /// - /// Attempts to add an item. - /// - /// The item to add. - /// A cancellation token that can be used to abort the add operation. - /// Whether to run this method synchronously. - internal async Task DoAddAsync(T item, CancellationToken cancellationToken, bool sync) - { - using (sync ? _mutex.Lock() : await _mutex.LockAsync()) - { - // Wait for the collection to be not full. - while (Full && !_completed) - { - if (sync) - _completedOrNotFull.Wait(cancellationToken); - else - await _completedOrNotFull.WaitAsync(cancellationToken); - } - - // If the queue has been marked complete, then abort. - if (_completed) - throw new InvalidOperationException("Add failed; the producer/consumer collection has completed adding."); - - if (!_collection.TryAdd(item)) - throw new InvalidOperationException("Add failed; the add to the underlying collection failed."); - - _completedOrNotEmpty.Notify(); - } - } - - /// - /// Adds an item to the producer/consumer collection. Throws - /// if the producer/consumer collection has completed adding or if the item was rejected - /// by the underlying collection. - /// - /// The item to add. - /// A cancellation token that can be used to abort the add operation. - public Task AddAsync(T item, CancellationToken cancellationToken) - => DoAddAsync(item, cancellationToken, sync: false); - - /// - /// Adds an item to the producer/consumer collection. Throws - /// if the producer/consumer collection has completed adding or if the item was rejected - /// by the underlying collection. This method may block the calling thread. - /// - /// The item to add. - /// A cancellation token that can be used to abort the add operation. - public void Add(T item, CancellationToken cancellationToken) - => DoAddAsync(item, cancellationToken, sync: true).WaitAndUnwrapException(CancellationToken.None); - - /// - /// Adds an item to the producer/consumer collection. Throws - /// if the producer/consumer collection has completed adding or if the item was rejected - /// by the underlying collection. - /// - /// The item to add. - public Task AddAsync(T item) => AddAsync(item, CancellationToken.None); - - /// - /// Adds an item to the producer/consumer collection. Throws if the producer/consumer collection has completed adding or if the item was rejected by the underlying collection. This method may block the calling thread. - /// - /// The item to add. - public void Add(T item) => Add(item, CancellationToken.None); - - /// - /// Waits until an item is available to take. Returns false if the producer/consumer collection has completed adding and there are no more items. - /// - /// A cancellation token that can be used to abort the wait. - /// Whether to run this method synchronously. - private async Task DoOutputAvailableAsync(CancellationToken cancellationToken, bool sync) - { - using (sync ? _mutex.Lock() : await _mutex.LockAsync()) - { - while (Empty && !_completed) - { - if (sync) - _completedOrNotEmpty.Wait(cancellationToken); - else - await _completedOrNotEmpty.WaitAsync(cancellationToken); - } - return !Empty; - } - } - - /// - /// Asynchronously waits until an item is available to take. Returns false if the producer/consumer collection has completed adding and there are no more items. - /// - /// A cancellation token that can be used to abort the asynchronous wait. - public Task OutputAvailableAsync(CancellationToken cancellationToken) => DoOutputAvailableAsync(cancellationToken, sync: false); - - /// - /// Asynchronously waits until an item is available to take. Returns false if the producer/consumer collection has completed adding and there are no more items. - /// - public Task OutputAvailableAsync() => OutputAvailableAsync(CancellationToken.None); - - /// - /// Synchronously waits until an item is available to take. Returns false if the producer/consumer collection has completed adding and there are no more items. - /// - /// A cancellation token that can be used to abort the wait. - public bool OutputAvailable(CancellationToken cancellationToken) => DoOutputAvailableAsync(cancellationToken, sync: true).WaitAndUnwrapException(); - - /// - /// Synchronously waits until an item is available to take. Returns false if the producer/consumer collection has completed adding and there are no more items. - /// - public bool OutputAvailable() => OutputAvailable(CancellationToken.None); - - /// - /// Provides a (synchronous) consuming enumerable for items in the producer/consumer collection. - /// - /// A cancellation token that can be used to abort the synchronous enumeration. - public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) - { - while (true) - { - T item; - try - { - item = Take(cancellationToken); - } - catch (InvalidOperationException) - { - yield break; - } - yield return item; - } - } - - /// - /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue. - /// - public IEnumerable GetConsumingEnumerable() - { - return GetConsumingEnumerable(CancellationToken.None); - } - - /// - /// Attempts to take an item. - /// - /// A cancellation token that can be used to abort the take operation. - /// Whether to run this method synchronously. - /// The collection has been marked complete for adding and is empty. - private async Task DoTakeAsync(CancellationToken cancellationToken, bool sync) - { - using (sync ? _mutex.Lock() : await _mutex.LockAsync()) - { - while (Empty && !_completed) - { - if (sync) - _completedOrNotEmpty.Wait(cancellationToken); - else - await _completedOrNotEmpty.WaitAsync(cancellationToken); - } - - if (_completed && Empty) - throw new InvalidOperationException("Take failed; the producer/consumer collection has completed adding and is empty."); - - if (!_collection.TryTake(out T item)) - throw new InvalidOperationException("Take failed; the take from the underlying collection failed."); - - _completedOrNotFull.Notify(); - return item; - } - } - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. - /// - /// A cancellation token that can be used to abort the take operation. - public Task TakeAsync(CancellationToken cancellationToken) => DoTakeAsync(cancellationToken, sync: false); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. - /// - public Task TakeAsync() => TakeAsync(CancellationToken.None); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - /// A cancellation token that can be used to abort the take operation. - public T Take(CancellationToken cancellationToken) => DoTakeAsync(cancellationToken, sync: true).WaitAndUnwrapException(); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - public T Take() => Take(CancellationToken.None); - - /// - /// Takes an item without waiting from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - public bool TryTake(out T item) => _collection.TryTake(out item); - - /// - /// Attempts to peek an item. - /// - /// A cancellation token that can be used to abort the take operation. - /// Whether to run this method synchronously. - /// The collection has been marked complete for adding and is empty. - private async Task DoPeekAsync(CancellationToken cancellationToken, bool sync) - { - using (sync ? _mutex.Lock() : await _mutex.LockAsync()) - { - while (Empty && !_completed) - { - if (sync) - _completedOrNotEmpty.Wait(cancellationToken); - else - await _completedOrNotEmpty.WaitAsync(cancellationToken); - } - - if (_completed && Empty) - throw new InvalidOperationException("Peek failed; the producer/consumer collection has completed adding and is empty."); - - if (!_collection.TryPeek(out var item)) - throw new InvalidOperationException("Peek failed; the take from the underlying collection failed."); - - _completedOrNotFull.Notify(); - return item; - } - } - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. - /// - /// A cancellation token that can be used to abort the take operation. - public Task PeekAsync(CancellationToken cancellationToken) => DoPeekAsync(cancellationToken, sync: false); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. - /// - public Task PeekAsync() => PeekAsync(CancellationToken.None); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - /// A cancellation token that can be used to abort the take operation. - public T Peek(CancellationToken cancellationToken) => DoPeekAsync(cancellationToken, sync: true).WaitAndUnwrapException(); - - /// - /// Takes an item from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - public T Peek() => Peek(CancellationToken.None); - - /// - /// Takes an item without waiting from the producer/consumer collection. Returns the item. - /// Throws if the producer/consumer collection has completed adding - /// and is empty, or if the take from the underlying collection failed. This method may block the calling thread. - /// - public bool TryPeek(out T item) => _collection.TryPeek(out item); - - [DebuggerNonUserCode] - internal sealed class DebugView - { - private readonly AsyncPeekableCollection _peekableCollection; - - public DebugView(AsyncPeekableCollection peekableCollection) - { - _peekableCollection = peekableCollection; - } - - [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] - public T[] Items => _peekableCollection._collection.ToArray(); - } - } - - internal interface IPeekableProducerConsumerCollection:IProducerConsumerCollection - { - bool TryPeek(out T item); - } -} diff --git a/src/core/Akka.TestKit/Internal/AsyncQueue.cs b/src/core/Akka.TestKit/Internal/AsyncQueue.cs deleted file mode 100644 index b63a8c94491..00000000000 --- a/src/core/Akka.TestKit/Internal/AsyncQueue.cs +++ /dev/null @@ -1,274 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Nito.AsyncEx.Synchronous; - -namespace Akka.TestKit.Internal -{ - public class AsyncQueue: ITestQueue where T: class - { - private readonly AsyncPeekableCollection _collection = new(new QueueCollection()); - - public int Count => _collection.Count; - - public void Enqueue(T item) => EnqueueAsync(item).AsTask().WaitAndUnwrapException(); - - public ValueTask EnqueueAsync(T item) => new(_collection.AddAsync(item)); - - public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - var task = TryEnqueueAsync(item, millisecondsTimeout, cancellationToken); - task.AsTask().Wait(cancellationToken); - return task.Result; - } - - public async ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) - { - cts.CancelAfter(millisecondsTimeout); - try - { - await _collection.AddAsync(item, cts.Token); - return true; - } - catch - { - return false; - } - } - } - - public bool TryTake(out T item, CancellationToken cancellationToken = default) - { - if (cancellationToken.IsCancellationRequested) - { - item = default; - return false; - } - - try - { - // TryRead returns immediately - return _collection.TryTake(out item); - } - catch - { - item = default; - return false; - } - } - - public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - try - { - var task = TryTakeAsync(millisecondsTimeout, cancellationToken).AsTask(); - task.Wait(cancellationToken); - item = task.Result.item; - return task.Result.success; - } - catch - { - item = default; - return false; - } - } - - public async ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) - { - try - { - var result = await _collection.TakeAsync(cancellationToken); - return (true, result); - } - catch - { - return (false, default); - } - } - - public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken) - { - using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) - { - cts.CancelAfter(millisecondsTimeout); - return await TryTakeAsync(cts.Token); - } - } - - public T Take(CancellationToken cancellationToken) - { - if(!_collection.TryTake(out var item)) - throw new InvalidOperationException("Failed to dequeue item from the queue."); - return item; - } - - public ValueTask TakeAsync(CancellationToken cancellationToken) => new(_collection.TakeAsync(cancellationToken)); - - public bool TryPeek(out T item) => _collection.TryPeek(out item); - - public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - try - { - var task = TryPeekAsync(millisecondsTimeout, cancellationToken).AsTask(); - task.Wait(cancellationToken); - item = task.Result.item; - return task.Result.success; - } - catch - { - item = default; - return false; - } - } - - public async ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) - { - try - { - var result = await _collection.PeekAsync(cancellationToken); - return (true, result); - } - catch - { - return (false, default); - } - } - - public async ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken) - { - using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) - { - cts.CancelAfter(millisecondsTimeout); - return await TryPeekAsync(cts.Token); - } - } - - public T Peek(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - throw new OperationCanceledException("Peek operation canceled"); - - if(!_collection.TryPeek(out var item)) - throw new InvalidOperationException("Failed to peek item from the queue."); - return item; - } - - public ValueTask PeekAsync(CancellationToken cancellationToken) => new(_collection.PeekAsync(cancellationToken)); - - public List ToList() - { - throw new System.NotImplementedException(); - } - - private class QueueCollection : IPeekableProducerConsumerCollection - { - private readonly Queue _queue = new(); - - public int Count { - get - { - lock (SyncRoot) - { - return _queue.Count; - } - } - } - - public bool TryAdd(T item) - { - lock (SyncRoot) - { - _queue.Enqueue(item); - return true; - } - } - - public bool TryTake(out T item) - { - lock(SyncRoot) - { - if(_queue.Count == 0) - { - item = null; - return false; - } - - item = _queue.Dequeue(); - return true; - } - } - - public bool TryPeek(out T item) - { - lock(SyncRoot) - { - if(_queue.Count == 0) - { - item = null; - return false; - } - - item = _queue.Peek(); - return true; - } - } - - public void CopyTo(T[] array, int index) - { - lock(SyncRoot) - { - _queue.CopyTo(array, index); - } - } - - - public void CopyTo(Array array, int index) - { - lock(SyncRoot) - { - ((ICollection)_queue).CopyTo(array, index); - } - } - - public T[] ToArray() - { - lock(SyncRoot) - { - return _queue.ToArray(); - } - } - - - public IEnumerator GetEnumerator() - { - lock(SyncRoot) - { - //We must create a copy - return new List(_queue).GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - public object SyncRoot { get; } = new(); - - public bool IsSynchronized => true; - } - } - -} diff --git a/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs b/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs deleted file mode 100644 index 6fc3989fd55..00000000000 --- a/src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs +++ /dev/null @@ -1,67 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; - -namespace Akka.TestKit.Internal -{ - /// - /// This class represents an implementation of - /// that uses a as its backing store. - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. - /// - /// The type of item to store. - public class BlockingCollectionTestActorQueue : ITestActorQueue - { - private readonly ITestQueue _queue; - - /// - /// Initializes a new instance of the class. - /// - /// The queue to use as the backing store. - public BlockingCollectionTestActorQueue(ITestQueue queue) - { - _queue = queue; - } - - /// - /// Adds the specified item to the end of the queue. - /// - /// The item to add to the queue. - public void Enqueue(T item) - { - _queue.Enqueue(item); - } - - /// - /// Return an for the items inside the collection. - /// - /// A for the items - public List ToList() - { - return _queue.ToList(); - } - - /// - /// - /// Retrieves all items from the queue. - /// - /// - /// This will remove all items from the queue. - /// - /// - /// An enumeration of all items removed from the queue. - public IEnumerable GetAll() - { - while(_queue.TryTake(out var item)) - { - yield return item; - } - } - } -} diff --git a/src/core/Akka.TestKit/Internal/BlockingQueue.cs b/src/core/Akka.TestKit/Internal/BlockingQueue.cs deleted file mode 100644 index 0f81844931b..00000000000 --- a/src/core/Akka.TestKit/Internal/BlockingQueue.cs +++ /dev/null @@ -1,288 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace Akka.TestKit.Internal -{ - /// - /// This class represents a queue with the same characteristics of a . - /// The queue can enqueue items at either the front (FIFO) or the end (LIFO) of the collection. - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. - /// - /// The type of item to store. - public class BlockingQueue : ITestQueue - { - private readonly BlockingCollection _collection = new(new QueueWithAddFirst()); - - public int Count { get { return _collection.Count; } } - - public void Enqueue(T item) - { - if (!_collection.TryAdd(new Positioned(item))) - throw new InvalidOperationException("Failed to enqueue item into the queue."); - } - - public ValueTask EnqueueAsync(T item) - { - Enqueue(item); - return new ValueTask(); - } - - [Obsolete("This method will be removed from the public API in the future")] - public void AddFirst(T item) - { - if(!_collection.TryAdd(new Positioned(item, first:true))) - throw new InvalidOperationException("Failed to enqueue item into the head of the queue."); - } - - public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - return _collection.TryAdd(new Positioned(item), millisecondsTimeout, cancellationToken); - } - - public ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - return new ValueTask(TryEnqueue(item, millisecondsTimeout, cancellationToken)); - } - - public bool TryTake(out T item, CancellationToken cancellationToken = default) - { - if(_collection.TryTake(out var p, 0, cancellationToken)) - { - item = p.Value; - return true; - } - item = default; - return false; - } - - public ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) - { - var result = TryTake(out var item); - return new ValueTask<(bool success, T item)>((result, item)); - } - - public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken)) - { - item = p.Value; - return true; - } - item = default; - return false; - } - - public ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken) - { - var result = TryTake(out var item, millisecondsTimeout, cancellationToken); - return new ValueTask<(bool success, T item)>((result, item)); - } - - public T Take(CancellationToken cancellationToken) - { - var p = _collection.Take(cancellationToken); - return p.Value; - } - - public ValueTask TakeAsync(CancellationToken cancellationToken) - { - return new ValueTask(_collection.Take(cancellationToken).Value); - } - - #region Peek methods - - public bool TryPeek(out T item) - { - if(_collection.TryTake(out var p)) - { - item = p.Value; -#pragma warning disable CS0618 - AddFirst(item); -#pragma warning restore CS0618 - return true; - } - item = default; - return false; - } - - public ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) - { - if(_collection.TryTake(out var p)) - { - var item = p.Value; -#pragma warning disable CS0618 - AddFirst(item); -#pragma warning restore CS0618 - return new ValueTask<(bool success, T item)>((true, item)); - } - return new ValueTask<(bool success, T item)>((false, default)); - } - - public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken) - { - if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken)) - { - item = p.Value; -#pragma warning disable CS0618 - AddFirst(item); -#pragma warning restore CS0618 - return true; - } - item = default; - return false; - } - - public ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken) - { - if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken)) - { - var item = p.Value; -#pragma warning disable CS0618 - AddFirst(item); -#pragma warning restore CS0618 - return new ValueTask<(bool success, T item)>((true, item)); - } - return new ValueTask<(bool success, T item)>((false, default)); - } - - public T Peek(CancellationToken cancellationToken) - { - var p = _collection.Take(cancellationToken); -#pragma warning disable CS0618 - AddFirst(p.Value); -#pragma warning restore CS0618 - return p.Value; - } - - public ValueTask PeekAsync(CancellationToken cancellationToken) - { - var val = _collection.Take(cancellationToken).Value; -#pragma warning disable CS0618 - AddFirst(val); -#pragma warning restore CS0618 - return new ValueTask(val); - } - #endregion - - public List ToList() - { - var positionArray = _collection.ToArray(); - return positionArray.Select(positioned => positioned.Value).ToList(); - } - - - private class Positioned - { - private readonly T _value; - private readonly bool _first; - - public Positioned(T value, bool first = false) - { - _value = value; - _first = first; - } - - public T Value { get { return _value; } } - public bool First { get { return _first; } } - } - - private class QueueWithAddFirst : IProducerConsumerCollection - { - private readonly LinkedList _list = new(); - - public int Count { - get - { - lock (SyncRoot) - { - return _list.Count; - } - } - } - - public bool TryAdd(Positioned item) - { - lock (SyncRoot) - { - if(item.First) - _list.AddFirst(item); - else - _list.AddLast(item); - return true; - } - } - - public bool TryTake(out Positioned item) - { - lock(SyncRoot) - { - if(_list.Count == 0) - { - item = null; - return false; - } - - item = _list.First.Value; - _list.RemoveFirst(); - return true; - } - } - - public void CopyTo(Positioned[] array, int index) - { - lock(SyncRoot) - { - _list.CopyTo(array, index); - } - } - - - public void CopyTo(Array array, int index) - { - lock(SyncRoot) - { - ((ICollection)_list).CopyTo(array, index); - } - } - - public Positioned[] ToArray() - { - lock(SyncRoot) - { - return _list.ToArray(); - } - } - - - public IEnumerator GetEnumerator() - { - lock(SyncRoot) - { - //We must create a copy - return new List(_list).GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - public object SyncRoot { get; } = new(); - - public bool IsSynchronized => true; - } - } -} diff --git a/src/core/Akka.TestKit/Internal/ITestActorQueue.cs b/src/core/Akka.TestKit/Internal/ITestActorQueue.cs deleted file mode 100644 index 2bb6be581b8..00000000000 --- a/src/core/Akka.TestKit/Internal/ITestActorQueue.cs +++ /dev/null @@ -1,43 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; - -namespace Akka.TestKit.Internal -{ - /// - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. - /// - /// TBD - public interface ITestActorQueueProducer - { - /// Adds the specified item to the queue. - /// The item. - void Enqueue(T item); - } - - /// - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. - /// - /// TBD - public interface ITestActorQueue : ITestActorQueueProducer - { - /// - /// Copies all the items from the instance into a new - /// - /// TBD - [Obsolete("This method will be removed in the future")] - List ToList(); - - /// - /// Get all messages. - /// - /// TBD - IEnumerable GetAll(); - } -} diff --git a/src/core/Akka.TestKit/Internal/ITestQueue.cs b/src/core/Akka.TestKit/Internal/ITestQueue.cs deleted file mode 100644 index 5cd61967d16..00000000000 --- a/src/core/Akka.TestKit/Internal/ITestQueue.cs +++ /dev/null @@ -1,168 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2023 Lightbend Inc. -// Copyright (C) 2013-2023 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace Akka.TestKit.Internal -{ - public interface ITestQueue - { - /// - /// The number of items that are currently in the queue. - /// - int Count { get; } - - /// - /// Adds the specified item to the end of the queue. - /// - /// The item to add to the queue. - void Enqueue(T item); - - /// - /// Adds the specified item to the end of the queue. - /// - /// The item to add to the queue. - ValueTask EnqueueAsync(T item); - - /// - /// Tries to add the specified item to the end of the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The item to add to the queue. - /// The number of milliseconds to wait for the add to complete. - /// The cancellation token that can be used to cancel the operation. - /// true if the add completed within the specified timeout; otherwise, false. - bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Tries to add the specified item to the end of the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The item to add to the queue. - /// The number of milliseconds to wait for the add to complete. - /// The cancellation token that can be used to cancel the operation. - /// true if the add completed within the specified timeout; otherwise, false. - ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Tries to remove the specified item from the queue. - /// - /// The item to remove from the queue. - /// The cancellation token that can be used to cancel the operation. - /// true if the item was removed; otherwise, false. - bool TryTake(out T item, CancellationToken cancellationToken = default); - - /// - /// Tries to remove the specified item from the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The item to remove from the queue. - /// The number of milliseconds to wait for the remove to complete. - /// The cancellation token that can be used to cancel the operation. - /// true if the remove completed within the specified timeout; otherwise, false. - bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Tries to remove the specified item from the queue. - /// - /// The cancellation token that can be used to cancel the operation. - /// a tuple of bool and T, true if the item was removed; otherwise, false. - ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken); - - /// - /// Tries to remove the specified item from the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The number of milliseconds to wait for the remove to complete. - /// The cancellation token that can be used to cancel the operation. - /// a tuple of bool and T, true if the remove completed within the specified timeout; otherwise, false. - ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Removes an item from the collection. - /// - /// The cancellation token that can be used to cancel the operation. - /// - /// This exception is thrown when the operation is canceled. - /// - /// The item removed from the collection. - T Take(CancellationToken cancellationToken); - - /// - /// Removes an item from the collection. - /// - /// The cancellation token that can be used to cancel the operation. - /// - /// This exception is thrown when the operation is canceled. - /// - /// The item removed from the collection. - ValueTask TakeAsync(CancellationToken cancellationToken); - - /// - /// Tries to peek the specified item from the queue. - /// - /// The item to remove from the queue. - /// true if the item was removed; otherwise, false. - bool TryPeek(out T item); - - /// - /// Tries to peek the specified item from the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The item to remove from the queue. - /// The number of milliseconds to wait for the remove to complete. - /// The cancellation token that can be used to cancel the operation. - /// true if the remove completed within the specified timeout; otherwise, false. - bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Tries to peek the specified item from the queue. - /// - /// The cancellation token that can be used to cancel the operation. - /// a tuple of bool and T, true if the item was removed; otherwise, false. - ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken); - - /// - /// Tries to peek the specified item from the queue within the specified time period. - /// A token can be provided to cancel the operation if needed. - /// - /// The number of milliseconds to wait for the remove to complete. - /// The cancellation token that can be used to cancel the operation. - /// a tuple of bool and T, true if the remove completed within the specified timeout; otherwise, false. - ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken); - - /// - /// Peek an item from the collection. - /// - /// The cancellation token that can be used to cancel the operation. - /// - /// This exception is thrown when the operation is canceled. - /// - /// The item removed from the collection. - T Peek(CancellationToken cancellationToken); - - /// - /// Peek an item from the collection. - /// - /// The cancellation token that can be used to cancel the operation. - /// - /// This exception is thrown when the operation is canceled. - /// - /// The item removed from the collection. - ValueTask PeekAsync(CancellationToken cancellationToken); - - /// - /// Copies the items from the instance into a new . - /// - /// A containing copies of the elements of the collection - [Obsolete("This method will be removed in the future")] - List ToList(); - } -} diff --git a/src/core/Akka.TestKit/Internal/InternalTestActor.cs b/src/core/Akka.TestKit/Internal/InternalTestActor.cs index 1d1d929e630..4a0ec25c7b9 100644 --- a/src/core/Akka.TestKit/Internal/InternalTestActor.cs +++ b/src/core/Akka.TestKit/Internal/InternalTestActor.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Concurrent; +using System.Threading.Channels; using Akka.Actor; using Akka.Event; @@ -16,18 +17,18 @@ namespace Akka.TestKit.Internal /// An actor that enqueues received messages to a . /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. /// - public class InternalTestActor : UntypedActor + internal sealed class InternalTestActor : UntypedActor { - private readonly ITestActorQueue _queue; - private TestKit.TestActor.Ignore _ignore; + private readonly ChannelWriter _queue; + private TestActor.Ignore _ignore; private AutoPilot _autoPilot; - private DelegatingSupervisorStrategy _supervisorStrategy = new(); + private readonly DelegatingSupervisorStrategy _supervisorStrategy = new(); /// /// TBD /// /// TBD - public InternalTestActor(ITestActorQueue queue) + public InternalTestActor(ChannelWriter queue) { _queue = queue; } @@ -41,12 +42,12 @@ protected override void OnReceive(object message) { try { - global::System.Diagnostics.Debug.WriteLine("TestActor received " + message); + System.Diagnostics.Debug.WriteLine("TestActor received " + message); } catch (FormatException) { if (message is LogEvent { Message: LogMessage msg }) - global::System.Diagnostics.Debug.WriteLine( + System.Diagnostics.Debug.WriteLine( $"TestActor received a malformed formatted message. Template:[{msg.Format}], args:[{string.Join(",", msg.Unformatted())}]"); else throw; @@ -75,7 +76,11 @@ protected override void OnReceive(object message) { _supervisorStrategy.Update(actor, spawn._supervisorStrategy.Value); } - _queue.Enqueue(new RealMessageEnvelope(actor, Self)); + var wrote = _queue.TryWrite(new RealMessageEnvelope(actor, Self)); + if (!wrote) + { + throw new InvalidOperationException("Failed to write to internal TestActor queue"); + } return; } } @@ -87,8 +92,15 @@ protected override void OnReceive(object message) if(newAutoPilot is not KeepRunning) _autoPilot = newAutoPilot; } - if(_ignore == null || !_ignore(message)) - _queue.Enqueue(new RealMessageEnvelope(message, actorRef)); + + if (_ignore != null && _ignore(message)) return; + { + var wrote = _queue.TryWrite(new RealMessageEnvelope(message, actorRef)); + if (!wrote) + { + throw new InvalidOperationException("Failed to write to internal TestActor queue"); + } + } } } } diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index c639719304b..a79f3cbf6be 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -9,6 +9,7 @@ using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Actor; using Akka.Actor.Internal; @@ -36,7 +37,7 @@ public TestState() public ActorSystem System { get; set; } public TestKitSettings TestKitSettings { get; set; } - public ITestQueue Queue { get; set; } + public Channel Queue { get; set; } public MessageEnvelope LastMessage { get; set; } public IActorRef TestActor { get; set; } public TimeSpan? End { get; set; } @@ -113,9 +114,7 @@ protected TestKitBase(ITestKitAssertions assertions, Config config, string actor protected TestKitBase(ITestKitAssertions assertions, ActorSystem system, ActorSystemSetup config, string actorSystemName, string testActorName) { - if(assertions == null) throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null."); - - _assertions = assertions; + _assertions = assertions ?? throw new ArgumentNullException(nameof(assertions), "The supplied assertions must not be null."); InitializeTest(system, config, actorSystemName, testActorName); } @@ -160,7 +159,7 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi system.RegisterExtension(new TestKitAssertionsExtension(_assertions)); _testState.TestKitSettings = TestKitExtension.For(_testState.System); - _testState.Queue = new AsyncQueue(); + _testState.Queue = Channel.CreateUnbounded(); _testState.Log = Logging.GetLogger(system, GetType()); _testState.EventFilterFactory = new EventFilterFactory(this); @@ -203,7 +202,7 @@ private static void WaitUntilTestActorIsReady(IActorRef testActor) { while (stopwatch.Elapsed < deadline) { - ready = !(testActor is IRepointableRef repRef) || repRef.IsStarted; + ready = testActor is not IRepointableRef repRef || repRef.IsStarted; if (ready) break; Thread.Sleep(10); } @@ -308,7 +307,7 @@ public EventFilterFactory CreateEventFilter(ActorSystem system) /// public bool HasMessages { - get { return _testState.Queue.Count > 0; } + get { return _testState.Queue.Reader.Count > 0; } } /// @@ -701,7 +700,7 @@ public IActorRef CreateTestActor(string name) private IActorRef CreateTestActor(ActorSystem system, string name) { - var testActorProps = Props.Create(() => new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))) + var testActorProps = Props.Create(() => new InternalTestActor(_testState.Queue)) .WithDispatcher("akka.test.test-actor.dispatcher"); var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 71c0e3b6197..feb1d369bc2 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -253,20 +253,33 @@ public bool TryReceiveOne( if (maxDuration.IsZero()) { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue. Will not wait."); - var didTake = _testState.Queue.TryTake(out var item, cancellationToken); + var didTake = _testState.Queue.Reader.TryRead(out var item); take = (didTake, item); } else if (maxDuration.IsPositiveFinite()) { ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration); - take = await _testState.Queue.TryTakeAsync((int)maxDuration.TotalMilliseconds, cancellationToken) - ; + var delayTask = Task.Delay(maxDuration, cancellationToken); + var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); + var completedTask = await Task.WhenAny(readTask, delayTask); + + if (completedTask == readTask && readTask.Result) + { + // Data is available within the timeout. + var didTake = _testState.Queue.Reader.TryRead(out var item); + take = (didTake, item); + } + else + { + // Timeout occurred before data was available. + take = (false, null); + } } else if (maxDuration == Timeout.InfiniteTimeSpan) { Log.Warning("Trying to receive message from TestActor queue with infinite timeout! Will wait indefinitely!"); - take = await _testState.Queue.TryTakeAsync(-1, cancellationToken) - ; + var readItem = await _testState.Queue.Reader.ReadAsync(cancellationToken); + take = (true, readItem); } else { @@ -276,8 +289,7 @@ public bool TryReceiveOne( } _testState.LastWasNoMsg = false; - if (take.env == null) - take.env = NullMessageEnvelope.Instance; + take.env ??= NullMessageEnvelope.Instance; _testState.LastMessage = take.env; @@ -383,20 +395,34 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Cancellation if (maxDuration.IsZero()) { ConditionalLog(shouldLog, "Trying to peek message from TestActor queue. Will not wait."); - var didPeek = _testState.Queue.TryPeek(out var item); + var didPeek = _testState.Queue.Reader.TryPeek(out var item); peek = (didPeek, item); } else if (maxDuration.IsPositiveFinite()) { ConditionalLog(shouldLog, "Trying to peek message from TestActor queue within {0}", maxDuration); - peek = await _testState.Queue.TryPeekAsync((int)maxDuration.TotalMilliseconds, cancellationToken) - ; + var delayTask = Task.Delay(maxDuration, cancellationToken); + var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask(); + var completedTask = await Task.WhenAny(readTask, delayTask); + + if (completedTask == readTask && readTask.Result) + { + // Data is available within the timeout. + var didTake = _testState.Queue.Reader.TryPeek(out var item); + peek = (didTake, item); + } + else + { + // Timeout occurred before data was available. + peek = (false, default); + } } else if (maxDuration == Timeout.InfiniteTimeSpan) { Log.Warning("Trying to peek message from TestActor queue with infinite timeout! Will wait indefinitely!"); - peek = await _testState.Queue.TryPeekAsync(-1, cancellationToken) - ; + await _testState.Queue.Reader.WaitToReadAsync(cancellationToken); + var didPeek = _testState.Queue.Reader.TryPeek(out var item); + peek = (didPeek, item); } else { @@ -554,8 +580,7 @@ public async IAsyncEnumerable ReceiveWhileAsync( cancellationToken.ThrowIfCancellationRequested(); // Peek the message on the front of the queue - var (success, envelope) = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken) - ; + var (success, envelope) = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken); if (!success) { _testState.LastMessage = msg; @@ -568,8 +593,7 @@ public async IAsyncEnumerable ReceiveWhileAsync( if (result != null) { // This should happen immediately (zero timespan). Something is wrong if this fails. - var (receiveSuccess, receiveEnvelope) = await InternalTryReceiveOneAsync(TimeSpan.Zero, true, cancellationToken) - ; + var (receiveSuccess, receiveEnvelope) = await InternalTryReceiveOneAsync(TimeSpan.Zero, true, cancellationToken); if (!receiveSuccess) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); @@ -697,7 +721,7 @@ public async IAsyncEnumerable ReceiveWhileAsync( private void PopPeekedEnvelope(MessageEnvelope envelope) { // This should happen immediately (zero timespan). Something is wrong if this fails. - var success = _testState.Queue.TryTake(out var receivedEnvelope); + var success = _testState.Queue.Reader.TryRead(out var receivedEnvelope); if (!success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); diff --git a/src/core/Akka.Tests/Actor/Scheduler/HashedWheelTimerSchedulerContentionSpec.cs b/src/core/Akka.Tests/Actor/Scheduler/HashedWheelTimerSchedulerContentionSpec.cs deleted file mode 100644 index c28688187c2..00000000000 --- a/src/core/Akka.Tests/Actor/Scheduler/HashedWheelTimerSchedulerContentionSpec.cs +++ /dev/null @@ -1,142 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2009-2024 Lightbend Inc. -// Copyright (C) 2013-2024 .NET Foundation -// -// ----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading; -using Akka.Actor; -using Akka.Event; -using FluentAssertions; -using FluentAssertions.Extensions; -using Xunit; -using Xunit.Abstractions; - -namespace Akka.Tests.Actor.Scheduler; - -public class HashedWheelTimerSchedulerContentionSpec: TestKit.Xunit2.TestKit -{ - private const int TotalActor = 5000; - private const int TotalThreads = 10; - private const int ActorsPerThread = TotalActor / TotalThreads; - - public HashedWheelTimerSchedulerContentionSpec(ITestOutputHelper output) : base("{}", output) - { - } - - [Fact] - public void SchedulerContentionTest() - { - var collector = CreateTestProbe(); - foreach (var i in Enumerable.Range(0, TotalActor)) - { - Sys.ActorOf(Props.Create(() => new DoStuffActor(TestActor, collector)), i.ToString()); - } - - Within(10.Seconds(), () => - { - for (var x = 0; x < TotalActor; x++) - { - ExpectMsg(); - } - }); - - object? received = null; - do - { - received = collector.ReceiveOne(TimeSpan.Zero); - if (received is long value) - { - value.Should().BeLessThan(200, "Scheduler should not experience resource contention"); - } - } while (received is not null); - - } - - [Fact] - public void SchedulerContentionThreadedTest() - { - var collector = CreateTestProbe(); - var threads = new List(); - - foreach (var j in Enumerable.Range(0, TotalThreads)) - { - threads.Add(new Thread(() => RunThread(j))); - } - - foreach (var thread in threads) - { - thread.Start(); - } - - foreach (var thread in threads) - { - thread.Join(); - } - - Within(10.Seconds(), () => - { - for (var x = 0; x < TotalActor; x++) - { - ExpectMsg(); - } - }); - - object? received = null; - do - { - received = collector.ReceiveOne(TimeSpan.Zero); - if (received is long value) - { - value.Should().BeLessThan(200, "Scheduler should not experience resource contention"); - } - } while (received is not null); - - return; - - void RunThread(int n) - { - n *= ActorsPerThread; - for (var i = 0; i < ActorsPerThread; i++) - { - Sys.ActorOf(Props.Create(() => new DoStuffActor(TestActor, collector)), (n + i).ToString()); - } - } - } - - public class DoStuffActor : ReceiveActor, IWithTimers - { - private readonly IActorRef _collector; - public ITimerScheduler Timers { get; set; } - - public DoStuffActor(IActorRef probe, IActorRef collector) - { - _collector = collector; - - Receive(d => - { - Context.Stop(Self); - probe.Tell(d); - }); - } - - protected override void PreStart() - { - base.PreStart(); - var sw = Stopwatch.StartNew(); - Timers.StartSingleTimer("Test", Done.Instance, TimeSpan.FromSeconds(3)); - sw.Stop(); - - if (sw.ElapsedMilliseconds > 0) - { - Context.GetLogger().Info($"{sw.ElapsedMilliseconds}"); - _collector.Tell(sw.ElapsedMilliseconds); - } - } - } -} \ No newline at end of file diff --git a/src/core/Akka.Tests/Actor/Scheduler/SchedulerShutdownSpec.cs b/src/core/Akka.Tests/Actor/Scheduler/SchedulerShutdownSpec.cs index 4bb7806b652..773ced30442 100644 --- a/src/core/Akka.Tests/Actor/Scheduler/SchedulerShutdownSpec.cs +++ b/src/core/Akka.Tests/Actor/Scheduler/SchedulerShutdownSpec.cs @@ -160,12 +160,12 @@ public async Task ActorSystem_default_scheduler_must_never_accept_more_work_afte var receiver = sys.ActorOf(Props.Create(() => new MyScheduledActor())); sys.Scheduler.ScheduleTellOnce(0, receiver, "set", ActorRefs.NoSender); await Task.Delay(50); // let the scheduler run - var received = await receiver.Ask("get", TimeSpan.FromMilliseconds(100)); + var received = await receiver.Ask("get", TimeSpan.FromMilliseconds(TimeSpan.FromSeconds(3))); Assert.True(received); var terminated = await sys.Terminate().AwaitWithTimeout(TimeSpan.FromSeconds(5)); if (!terminated) - Assert.True(false, "Expected ActorSystem to terminate within 5s. Took longer."); + Assert.Fail("Expected ActorSystem to terminate within 5s. Took longer."); Assert.Throws(() => {