diff --git a/src/core/Akka.Streams.TestKit.Tests/TestPublisherSubscriberSpec.cs b/src/core/Akka.Streams.TestKit.Tests/TestPublisherSubscriberSpec.cs index c3f5ff0c8cf..7b18d6d89a7 100644 --- a/src/core/Akka.Streams.TestKit.Tests/TestPublisherSubscriberSpec.cs +++ b/src/core/Akka.Streams.TestKit.Tests/TestPublisherSubscriberSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.TestKit; using FluentAssertions; @@ -24,9 +25,9 @@ public TestPublisherSubscriberSpec(ITestOutputHelper output = null) : base(outpu } [Fact] - public void TestPublisher_and_TestSubscriber_should_have_all_events_accessible_from_manual_probes() + public async Task TestPublisher_and_TestSubscriber_should_have_all_events_accessible_from_manual_probes() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync( async() => { var upstream = this.CreateManualPublisherProbe(); var downstream = this.CreateManualSubscriberProbe(); @@ -34,26 +35,26 @@ public void TestPublisher_and_TestSubscriber_should_have_all_events_accessible_f .RunWith(Sink.AsPublisher(false), Materializer) .Subscribe(downstream); - var upstreamSubscription = upstream.ExpectSubscription(); - object evt = downstream.ExpectEvent(); + var upstreamSubscription = await upstream.ExpectSubscriptionAsync(); + object evt = await downstream.ExpectEventAsync(); evt.Should().BeOfType(); var downstreamSubscription = ((TestSubscriber.OnSubscribe) evt).Subscription; upstreamSubscription.SendNext(1); downstreamSubscription.Request(1); - evt = upstream.ExpectEvent(); + evt = await upstream.ExpectEventAsync(); evt.Should().BeOfType(); ((TestPublisher.RequestMore) evt).NrOfElements.Should().Be(1); - evt = downstream.ExpectEvent(); + evt = await downstream.ExpectEventAsync(); evt.Should().BeOfType>(); ((TestSubscriber.OnNext) evt).Element.Should().Be(1); upstreamSubscription.SendNext(1); downstreamSubscription.Request(1); - downstream.ExpectNext(1); + await downstream.ExpectNextAsync(1).Task; upstreamSubscription.SendComplete(); - evt = downstream.ExpectEvent(); + evt = await downstream.ExpectEventAsync(); evt.Should().BeOfType(); }, Materializer); } @@ -61,18 +62,21 @@ public void TestPublisher_and_TestSubscriber_should_have_all_events_accessible_f // "handle gracefully partial function that is not suitable" does not apply [Fact] - public void TestPublisher_and_TestSubscriber_should_properly_update_PendingRequest_in_ExpectRequest() + public async Task TestPublisher_and_TestSubscriber_should_properly_update_PendingRequest_in_ExpectRequest() { - var upstream = this.CreatePublisherProbe(); - var downstream = this.CreateSubscriberProbe(); + await this.AssertAllStagesStoppedAsync(async () => + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe(); - Source.FromPublisher(upstream).RunWith(Sink.FromSubscriber(downstream), Materializer); + Source.FromPublisher(upstream).RunWith(Sink.FromSubscriber(downstream), Materializer); - downstream.ExpectSubscription().Request(10); + (await downstream.ExpectSubscriptionAsync()).Request(10); - upstream.ExpectRequest().Should().Be(10); - upstream.SendNext(1); - downstream.ExpectNext(1); + (await upstream.ExpectRequestAsync()).Should().Be(10); + upstream.SendNext(1); + await downstream.ExpectNextAsync(1).Task; + }, Materializer); } } }