Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Convert Akka.Streams.Tests to async - FlowFlattenMergeSpec FlowGroupBySpec TimeoutsSpec #5963

Merged
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
474aba1
Convert Akka.Streams.Tests to async - FlowFlattenMergeSpec FlowGroupB…
Arkatufus May 25, 2022
b65dab1
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus May 25, 2022
ec80aea
Fix FlowGroupBySpec
Arkatufus May 25, 2022
fc27a00
Merge branch 'async_testkit/convert_Akka.Streams.Tests_Dsl.FlowFlatte…
Arkatufus May 25, 2022
be0334b
Fix FlowFlattenMergeSpec
Arkatufus May 25, 2022
894d87b
Fix AssertAllStagesStopped
Arkatufus May 25, 2022
5072fc0
Add ShouldThrowWithin Task extension
Arkatufus May 25, 2022
a34de2c
Fix FutureFlattenSourceSpec
Arkatufus May 25, 2022
9f131e0
Revert HubSpec to its original code
Arkatufus May 25, 2022
7c645a6
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus May 31, 2022
5e3b8fc
Add ForEachAsync sink
Arkatufus Jun 1, 2022
8933b4e
Remove extraneous async test functions, causes ambiguous function fin…
Arkatufus Jun 1, 2022
b11b147
Fix specs
Arkatufus Jun 1, 2022
564c08d
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 1, 2022
87d4411
Update API verify list
Arkatufus Jun 2, 2022
0caf627
Merge branch 'async_testkit/convert_Akka.Streams.Tests_Dsl.FlowFlatte…
Arkatufus Jun 2, 2022
4d38ceb
Fix XML-Doc
Arkatufus Jun 2, 2022
1316cca
Fix RestartSpec
Arkatufus Jun 2, 2022
36eb3fa
fix stringify
Arkatufus Jun 2, 2022
e8cf58e
Fix FlowDelaySpec tests
Arkatufus Jun 3, 2022
d25b2d1
Rewrite FlowDelaySpec
Arkatufus Jun 3, 2022
9df7d96
Fix HubSpec
Arkatufus Jun 3, 2022
ac08e4b
Fix HubSpec
Arkatufus Jun 3, 2022
076f517
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 3, 2022
2b0bec9
Merge dev
Arkatufus Jun 3, 2022
8eec0ba
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 6, 2022
bb33268
Fix FlowDelaySpec, add epsilon
Arkatufus Jun 6, 2022
5935a6b
Merge branch 'async_testkit/convert_Akka.Streams.Tests_Dsl.FlowFlatte…
Arkatufus Jun 6, 2022
0596382
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 6, 2022
3c60e6d
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 7, 2022
36f2c53
Fix FlowDelaySpec timing problem.
Arkatufus Jun 8, 2022
e53807d
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 10, 2022
c93e162
Add blamw flag to dotnet test
Arkatufus Jun 10, 2022
1277654
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 12, 2022
861ec78
Fix OutputStreamSourceSpec deadlock bug
Arkatufus Jun 16, 2022
d9c69de
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 16, 2022
7e9850d
Fix LastSinkSpec
Arkatufus Jun 16, 2022
32c3d0d
Merge branch 'async_testkit/convert_Akka.Streams.Tests_Dsl.FlowFlatte…
Arkatufus Jun 16, 2022
619d2d3
Fix deadlock caused by WithinAsync and AwaitConditionAsync
Arkatufus Jun 17, 2022
2d50cf9
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Arkatufus Jun 17, 2022
a9f5657
Merge branch 'dev' into async_testkit/convert_Akka.Streams.Tests_Dsl.…
Aaronontheweb Jun 20, 2022
19950ee
Skip persistence performance test for SqLite for now.
Arkatufus Jun 20, 2022
40c17fc
Merge branch 'async_testkit/convert_Akka.Streams.Tests_Dsl.FlowFlatte…
Arkatufus Jun 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions build.fsx
Original file line number Diff line number Diff line change
@@ -250,8 +250,8 @@ Target "RunTests" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetFrameworkVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetFrameworkVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetFrameworkVersion outputTests)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well keep this in here for the time being

| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetFrameworkVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
@@ -280,8 +280,8 @@ Target "RunTestsNetCore" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetCoreVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetCoreVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetCoreVersion outputTests)
| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetCoreVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
@@ -310,8 +310,8 @@ Target "RunTestsNet" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetVersion outputTests)
| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
Original file line number Diff line number Diff line change
@@ -143,10 +143,10 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity
err.Cause.Should().BeOfType<ActorInitializationException>();

// Need to wait for the internal state to reset, else everything we sent will go to dead letter
await AwaitConditionAsync(() =>
await AwaitConditionAsync(async () =>
{
persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var failedState = ExpectMsg<Shard.CurrentShardState>();
var failedState = await ExpectMsgAsync<Shard.CurrentShardState>();
return failedState.EntityIds.Count == 0;
});

Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ public async Task DistributedPubSubMediator_should_send_messages_to_dead_letter(

// assert
await EventFilter.DeadLetter<object>().ExpectAsync(1,
() => { mediator.Tell(new Publish("pub-sub", $"hit")); });
async () => { mediator.Tell(new Publish("pub-sub", "hit")); });
}
}

Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ public async Task ClusterSingletonProxy_with_zero_buffering_should_work()

// have to wait for cluster singleton to be ready, otherwise message will be rejected
await AwaitConditionAsync(
() => Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2,
async () => Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2,
TimeSpan.FromSeconds(30));

try
Original file line number Diff line number Diff line change
@@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Original file line number Diff line number Diff line change
@@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Original file line number Diff line number Diff line change
@@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
39 changes: 7 additions & 32 deletions src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
Original file line number Diff line number Diff line change
@@ -11,7 +11,9 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
@@ -50,26 +52,13 @@ protected async Task AwaitUpAsync()
{
await WithinAsync(TimeSpan.FromSeconds(10), async() =>
{
await AwaitConditionAsync(() => ClusterView.IsSingletonCluster);
await AwaitConditionAsync(async () => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.ShouldBe(_selfAddress);
ClusterView.Members.Select(m => m.Address).ShouldBe(new Address[] { _selfAddress });
await AwaitAssertAsync(() => ClusterView.Status.ShouldBe(MemberStatus.Up));
});
}


/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Join(Address)"/>.
/// </summary>
protected void Join(string expected)
{
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
});
}
/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Join(Address)"/>.
/// </summary>
@@ -79,21 +68,7 @@ await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
await EventFilter
.Info(contains: expected)
.ExpectOneAsync(TimeSpan.FromMinutes(1), () => _cluster.Join(_selfAddress));
});
}

/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Down(Address)"/>.
/// </summary>
/// <param name="expected"></param>
protected void Down(string expected)
{
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Down(_selfAddress));
.ExpectOneAsync(async () => _cluster.Join(_selfAddress));
});
}

@@ -107,7 +82,7 @@ await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
await EventFilter
.Info(contains: expected)
.ExpectOneAsync(() => _cluster.Down(_selfAddress));
.ExpectOneAsync(async () => _cluster.Down(_selfAddress));
});
}
}
@@ -139,9 +114,9 @@ public ClusterLogVerboseDefaultSpec(ITestOutputHelper output)
public async Task A_cluster_must_not_log_verbose_cluster_events_by_default()
{
_cluster.Settings.LogInfoVerbose.ShouldBeFalse();
Intercept<TrueException>(() => Join(upLogMessage));
await JoinAsync(upLogMessage).ShouldThrowWithin<TrueException>(10.Seconds());
await AwaitUpAsync();
Intercept<TrueException>(() => Down(downLogMessage));
await DownAsync(downLogMessage).ShouldThrowWithin<TrueException>(10.Seconds());
}
}

8 changes: 4 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ public async Task A_cluster_must_initially_become_singleton_cluster_when_joining
ClusterView.Members.Count.Should().Be(0);
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up
await AwaitConditionAsync(() => ClusterView.IsSingletonCluster);
await AwaitConditionAsync(async () => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.Should().Be(_selfAddress);
ClusterView.Members.Select(m => m.Address).ToImmutableHashSet()
.Should().BeEquivalentTo(ImmutableHashSet.Create(_selfAddress));
@@ -258,7 +258,7 @@ public async Task A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fir

// Cancelling the first task
cts.Cancel();
await AwaitConditionAsync(() => task1.IsCanceled, null, "Task should be cancelled");
await AwaitConditionAsync(async () => task1.IsCanceled, null, "Task should be cancelled");

await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
@@ -273,12 +273,12 @@ await WithinAsync(TimeSpan.FromSeconds(10), async () =>
ExpectMsg<ClusterEvent.MemberRemoved>().Member.Address.Should().Be(_selfAddress);

// Second task should complete (not cancelled)
await AwaitConditionAsync(() => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled.");
await AwaitConditionAsync(async () => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled.");
}, cancellationToken: cts.Token);

// Subsequent LeaveAsync() tasks expected to complete immediately (not cancelled)
var task3 = _cluster.LeaveAsync();
await AwaitConditionAsync(() => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled.");
await AwaitConditionAsync(async () => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled.");
}

[Fact]
2 changes: 1 addition & 1 deletion src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Original file line number Diff line number Diff line change
@@ -234,7 +234,7 @@ await EventFilter.Warning(contains: "Association with remote system").ExpectOneA
* Wait for the ReliableDeliverySupervisor to receive its "TooLongIdle" message,
* which will throw a HopelessAssociation wrapped around a TimeoutException.
*/
await EventFilter.Exception<TimeoutException>().ExpectOneAsync(() => { });
await EventFilter.Exception<TimeoutException>().ExpectOneAsync(async () => { });

await AwaitAssertAsync(() =>
{
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
@@ -521,7 +521,7 @@ public async Task Stash_inbound_connections_until_UID_is_known_for_pending_outbo
(new ActorAssociationEventListener(remoteTransportProbe)));

// Hijack associations through the test transport
await AwaitConditionAsync(() => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
await AwaitConditionAsync(async () => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
var testTransport = registry.TransportFor(rawLocalAddress).Value.Item1;
testTransport.WriteBehavior.PushConstant(true);

@@ -601,7 +601,7 @@ public async Task Properly_quarantine_stashed_inbound_connections()
(new ActorAssociationEventListener(remoteTransportProbe)));

// Hijack associations through the test transport
await AwaitConditionAsync(() => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
await AwaitConditionAsync(async () => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
var testTransport = registry.TransportFor(rawLocalAddress).Value.Item1;
testTransport.WriteBehavior.PushConstant(true);

22 changes: 11 additions & 11 deletions src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ public async Task ProtocolStateActor_must_in_inbound_mode_accept_payload_after_A

reader.Tell(TestAssociate(33), TestActor);

await AwaitConditionAsync(() => collaborators.FailureDetector.IsMonitoring, DefaultTimeout);
await AwaitConditionAsync(async () => collaborators.FailureDetector.IsMonitoring, DefaultTimeout);

var wrappedHandle = await ExpectMsgOfAsync(DefaultTimeout, "expected InboundAssociation", o =>
{
@@ -183,7 +183,7 @@ public async Task ProtocolStateActor_must_in_inbound_mode_accept_payload_after_A
Assert.True(collaborators.FailureDetector.IsMonitoring);

// Heartbeat was sent in response to Associate
await AwaitConditionAsync(() => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);

reader.Tell(_testPayload, TestActor);
await ExpectMsgAsync<InboundPayload>(inbound =>
@@ -211,7 +211,7 @@ public async Task ProtocolStateActor_must_in_inbound_mode_disassociate_when_an_u
//this associate will now be ignored
reader.Tell(TestAssociate(33), TestActor);

await AwaitConditionAsync(() =>
await AwaitConditionAsync(async () =>
{
var snapshots = collaborators.Registry.LogSnapshot();
return snapshots.Any(x => x is DisassociateAttempt);
@@ -234,12 +234,12 @@ public async Task ProtocolStateActor_must_in_outbound_mode_delay_readiness_until
codec: _codec,
failureDetector: collaborators.FailureDetector));

await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

await AwaitConditionAsync(() => collaborators.FailureDetector.IsMonitoring, DefaultTimeout);
await AwaitConditionAsync(async () => collaborators.FailureDetector.IsMonitoring, DefaultTimeout);

//keeps sending heartbeats
await AwaitConditionAsync(() => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);

Assert.False(statusPromise.Task.IsCompleted);

@@ -275,7 +275,7 @@ public async Task ProtocolStateActor_must_handle_explicit_disassociate_messages(
codec: _codec,
failureDetector: collaborators.FailureDetector));

await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

reader.Tell(TestAssociate(33), TestActor);

@@ -323,7 +323,7 @@ public async Task ProtocolStateActor_must_handle_transport_level_disassociations
codec: _codec,
failureDetector: collaborators.FailureDetector));

await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

reader.Tell(TestAssociate(33), TestActor);

@@ -371,7 +371,7 @@ public async Task ProtocolStateActor_must_disassociate_when_failure_detector_sig
codec: _codec,
failureDetector: collaborators.FailureDetector));

await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

stateActor.Tell(TestAssociate(33), TestActor);

@@ -391,7 +391,7 @@ public async Task ProtocolStateActor_must_disassociate_when_failure_detector_sig
wrappedHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

//wait for one heartbeat
await AwaitConditionAsync(() => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsHeartbeat(collaborators.Registry), DefaultTimeout);

collaborators.FailureDetector.SetAvailable(false);

@@ -422,7 +422,7 @@ public async Task ProtocolStateActor_must_handle_correctly_when_the_handler_is_r
codec: _codec,
failureDetector: collaborators.FailureDetector));

await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(async () => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

stateActor.Tell(TestAssociate(33), TestActor);

Loading