diff --git a/src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs index dc847d6ce85..7e83a0318f2 100644 --- a/src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs @@ -7,13 +7,13 @@ using System; using System.Linq; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; using Akka.TestKit.Xunit2.Attributes; using FluentAssertions; using Xunit; -// ReSharper disable InvokeAsExtensionMethod namespace Akka.Streams.Tests.Dsl { @@ -28,49 +28,59 @@ public TickSourceSpec() } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_produce_ticks() + public async Task A_Flow_based_on_a_tick_publisher_must_produce_ticks() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick") .To(Sink.FromSubscriber(c)) .Run(Materializer); - var sub = c.ExpectSubscription(); + + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(2); - c.ExpectNext("tick"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); - c.ExpectNext("tick"); + await c.ExpectNextAsync("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync("tick"); + sub.Cancel(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); }, Materializer); } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested() + public async Task A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested() { - var c = this.CreateManualSubscriberProbe(); - Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick") - .To(Sink.FromSubscriber(c)) - .Run(Materializer); - var sub = c.ExpectSubscription(); - sub.Request(2); - c.ExpectNext("tick"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); - c.ExpectNext("tick"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(1400)); - sub.Request(2); - c.ExpectNext("tick"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); - c.ExpectNext("tick"); - sub.Cancel(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await this.AssertAllStagesStoppedAsync(async () => + { + var c = this.CreateManualSubscriberProbe(); + Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick") + .To(Sink.FromSubscriber(c)) + .Run(Materializer); + + var sub = await c.ExpectSubscriptionAsync(); + + sub.Request(2); + await c.ExpectNextAsync("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1400)); + + sub.Request(2); + await c.ExpectNextAsync("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync("tick"); + + sub.Cancel(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + }, Materializer); } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_firs() + public async Task A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_first() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var p = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick") .RunWith(Sink.AsPublisher(false), Materializer); @@ -78,21 +88,24 @@ public void A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_bu var c2 = this.CreateManualSubscriberProbe(); p.Subscribe(c1); p.Subscribe(c2); - var sub1 = c1.ExpectSubscription(); - c2.ExpectSubscriptionAndError(); + + var sub1 = await c1.ExpectSubscriptionAsync(); + await c2.ExpectSubscriptionAndErrorAsync(); + sub1.Request(1); - c1.ExpectNext("tick"); - c1.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await c1.ExpectNextAsync("tick"); + await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub1.Request(2); - c1.ExpectNext("tick"); + await c1.ExpectNextAsync("tick"); sub1.Cancel(); }, Materializer); } [LocalFact(SkipLocal = "Racy. See https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")] - public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting() + public async Task A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); RunnableGraph.FromGraph(GraphDsl.Create(b => @@ -106,67 +119,74 @@ public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simpl .To(Sink.FromSubscriber(c)); return ClosedShape.Instance; })).Run(Materializer); - var sub = c.ExpectSubscription(); + + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(1000); - c.ExpectNext(1); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); - c.ExpectNext(2); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync(1); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync(2); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); sub.Cancel(); }, Materializer); } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel() + public async Task A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick"); var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(2); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(600)); - c.ExpectNext("tick"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); - c.ExpectNext("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(600)); + await c.ExpectNextAsync("tick"); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await c.ExpectNextAsync("tick"); + cancelable.Cancel(); - AwaitCondition(() => cancelable.IsCancellationRequested); + await AwaitConditionAsync(async () => cancelable.IsCancellationRequested); + sub.Request(3); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state() + public async Task A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick"); var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(2); - c.ExpectNext("tick"); + await c.ExpectNextAsync("tick"); cancelable.IsCancellationRequested.Should().BeFalse(); cancelable.Cancel(); cancelable.IsCancellationRequested.Should().BeTrue(); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization() + public async Task A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick"); var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer); cancelable.Cancel(); - var sub = c.ExpectSubscription(); + + var sub = await c.ExpectSubscriptionAsync(); sub.Request(2); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } }