Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.TestKit: deleted IAsyncQueue; replaced with System.Threading.Channel<T> #7157

Merged

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Apr 11, 2024

Changes

Fixes #7145 - this should resolve some of the "async continuation problems" reported there. Going to add a reproduction for that case specifically, but moreover this change allows us to delete 6 files worth of old shared state concurrent programming code, which is always a good thing.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

@Aaronontheweb Aaronontheweb added api-change akka-testkit Akka.NET Testkit issues labels Apr 11, 2024
@Aaronontheweb
Copy link
Member Author

Running locally and seeing a bunch of test failures from InternalTryReceiveOneAsync - which makes me think my CancellationToken usage is wrong there.

@Aaronontheweb
Copy link
Member Author

Looks like it's the ExpectNoMsgAsync implementation that's blowing up.


namespace Akka.TestKit.Tests;

public class Bugfix7145Spec : AkkaSpec
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec was able to reproduce the deadlock reported in #7145 on dev - changes so far allows these specs to pass.

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

@@ -622,84 +622,6 @@ namespace Akka.TestKit.Extensions
}
namespace Akka.TestKit.Internal
{
public class AsyncQueue<T> : Akka.TestKit.Internal.ITestQueue<T>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted this type

@@ -831,11 +709,6 @@ namespace Akka.TestKit.Internal
public virtual void HandleEvent(Akka.TestKit.Internal.EventFilterBase eventFilter, Akka.Event.LogEvent logEvent) { }
}
}
public class InternalTestActor : Akka.Actor.UntypedActor
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the InternalTestActor actually internal, instead of public for some forsaken reason.

@@ -406,10 +407,10 @@ public async Task A_Flow_with_SelectAsync_must_not_run_more_futures_than_configu
{
const int parallelism = 8;
var counter = new AtomicCounter();
var queue = new BlockingQueue<(TaskCompletionSource<int>, long)>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to replace call to non-longer-existing-API

@@ -335,10 +336,10 @@ public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_tha
await this.AssertAllStagesStoppedAsync(() => {
const int parallelism = 8;
var counter = new AtomicCounter();
var queue = new BlockingQueue<(TaskCompletionSource<int>, long)>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to replace call to no-longer-existing-API

{
private readonly ITestActorQueue<MessageEnvelope> _queue;
private TestKit.TestActor.Ignore _ignore;
private readonly ChannelWriter<MessageEnvelope> _queue;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced ITestActorQueue<T> with ChannelWriter<T>

@@ -36,7 +37,7 @@ public TestState()

public ActorSystem System { get; set; }
public TestKitSettings TestKitSettings { get; set; }
public ITestQueue<MessageEnvelope> Queue { get; set; }
public Channel<MessageEnvelope> Queue { get; set; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace TestState.Queue to use a Channel<MessageEnvelope> instead of the old ITestQueue

;
var delayTask = Task.Delay(maxDuration, cancellationToken);
var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask();
var completedTask = await Task.WhenAny(readTask, delayTask);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to do this instead of playing around with CancellationTokens expiring, because otherwise those work via throwing exceptions. This allows us to gracefully determine if we've hit the next point because stuff was ready or because time expired.

peek = await _testState.Queue.TryPeekAsync((int)maxDuration.TotalMilliseconds, cancellationToken)
;
var delayTask = Task.Delay(maxDuration, cancellationToken);
var readTask = _testState.Queue.Reader.WaitToReadAsync(cancellationToken).AsTask();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to do this instead of playing around with CancellationTokens expiring, because otherwise those work via throwing exceptions. This allows us to gracefully determine if we've hit the next point because stuff was ready or because time expired.

@Aaronontheweb Aaronontheweb marked this pull request as ready for review April 11, 2024 22:16
Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, left a few possible improvement notes

take = (didTake, item);
}
else if (maxDuration.IsPositiveFinite())
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue within {0}", maxDuration);
take = await _testState.Queue.TryTakeAsync((int)maxDuration.TotalMilliseconds, cancellationToken)
;
var delayTask = Task.Delay(maxDuration, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since maxDuration is infinite, it is possible for the Delay Task to survive until the end of the test process, causing a memory leak. For this to be leak safe, we'd need to use a linked CancellationTokenSource, use that token, and Cancel and Dispose of it with a try..catch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxDuration cannot be infinite here - that is explicitly checked by IsPositiveFinite. The TimeSpan.Ticks value is -1 when it's infinite.

src/core/Akka.TestKit/TestKitBase_Receive.cs Show resolved Hide resolved
src/core/Akka.TestKit/TestKitBase_Receive.cs Show resolved Hide resolved
src/core/Akka.TestKit/TestKitBase_Receive.cs Show resolved Hide resolved
@Arkatufus Arkatufus enabled auto-merge (squash) April 12, 2024 14:08
@Aaronontheweb Aaronontheweb disabled auto-merge April 12, 2024 14:09
@Aaronontheweb Aaronontheweb merged commit 2057bae into akkadotnet:dev Apr 12, 2024
2 of 11 checks passed
@Aaronontheweb Aaronontheweb deleted the fix-7145-testkit-AsyncQueue branch April 12, 2024 14:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
akka-testkit Akka.NET Testkit issues api-change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ExpectMsgAsync deadlocks with actor that has async receive handler and testprobe.Tell
2 participants