Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Convert Akka.Streams.Tests Dsl.TickSourceSpec #6024

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 77 additions & 57 deletions src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using FluentAssertions;
using Xunit;
// ReSharper disable InvokeAsExtensionMethod

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -28,71 +28,84 @@ public TickSourceSpec()
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_produce_ticks()
public async Task A_Flow_based_on_a_tick_publisher_must_produce_ticks()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);
var sub = c.ExpectSubscription();

var sub = await c.ExpectSubscriptionAsync();

sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");

sub.Cancel();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested()
public async Task A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested()
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);
var sub = c.ExpectSubscription();
sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(1400));
sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
sub.Cancel();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);

var sub = await c.ExpectSubscriptionAsync();

sub.Request(2);
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1400));

sub.Request(2);
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");

sub.Cancel();
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_firs()
public async Task A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_first()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var p = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.RunWith(Sink.AsPublisher<string>(false), Materializer);
var c1 = this.CreateManualSubscriberProbe<string>();
var c2 = this.CreateManualSubscriberProbe<string>();
p.Subscribe(c1);
p.Subscribe(c2);
var sub1 = c1.ExpectSubscription();
c2.ExpectSubscriptionAndError();

var sub1 = await c1.ExpectSubscriptionAsync();
await c2.ExpectSubscriptionAndErrorAsync();

sub1.Request(1);
c1.ExpectNext("tick");
c1.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c1.ExpectNextAsync("tick");
await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));

sub1.Request(2);
c1.ExpectNext("tick");
await c1.ExpectNextAsync("tick");
sub1.Cancel();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy. See https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")]
public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting()
public async Task A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -106,67 +119,74 @@ public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simpl
.To(Sink.FromSubscriber(c));
return ClosedShape.Instance;
})).Run(Materializer);
var sub = c.ExpectSubscription();

var sub = await c.ExpectSubscriptionAsync();

sub.Request(1000);
c.ExpectNext(1);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext(2);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync(1);
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync(2);
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Cancel();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel()
public async Task A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();

sub.Request(2);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(600));
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(600));
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");

cancelable.Cancel();
AwaitCondition(() => cancelable.IsCancellationRequested);
await AwaitConditionAsync(async () => cancelable.IsCancellationRequested);

sub.Request(3);
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state()
public async Task A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();

sub.Request(2);
c.ExpectNext("tick");
await c.ExpectNextAsync("tick");
cancelable.IsCancellationRequested.Should().BeFalse();
cancelable.Cancel();
cancelable.IsCancellationRequested.Should().BeTrue();
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization()
public async Task A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
cancelable.Cancel();
var sub = c.ExpectSubscription();

var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down