Skip to content

Commit

Permalink
[44-74] GraphMergeSpec (#6591)
Browse files Browse the repository at this point in the history
* [44-74] `GraphMergeSpec`

* Changes to `async` TestKit
  • Loading branch information
eaba authored Apr 4, 2023
1 parent 5d68c3a commit d925fd5
Showing 1 changed file with 37 additions and 42 deletions.
79 changes: 37 additions & 42 deletions src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand Down Expand Up @@ -46,10 +47,9 @@ public MergeFixture(GraphDsl.Builder<NotUsed> builder) : base(builder)
}

[Fact]
public void A_Merge_must_work_in_the_happy_case()
public async Task A_Merge_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
// Different input sizes(4 and 6)
var source1 = Source.From(Enumerable.Range(0, 4));
var source2 = Source.From(Enumerable.Range(4, 6));
Expand All @@ -63,27 +63,27 @@ public void A_Merge_must_work_in_the_happy_case()
var sink = Sink.FromSubscriber(probe);

b.From(source1).To(m1.In(0));
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x*2)).To(m2.In(0));
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x=>x+1)).To(sink);
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x * 2)).To(m2.In(0));
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x => x + 1)).To(sink);
b.From(source2).To(m1.In(1));
b.From(source3).To(m2.In(1));

return ClosedShape.Instance;
})).Run(Materializer);

var subscription = probe.ExpectSubscription();
var subscription = await probe.ExpectSubscriptionAsync();
var collected = new List<int>();
for (var i = 1; i <= 10; i++)
{
subscription.Request(1);
collected.Add(probe.ExpectNext());
collected.Add(await probe.ExpectNextAsync());
}

collected.Where(i => i <= 4).ShouldOnlyContainInOrder(1, 2, 3, 4);
collected.Where(i => i >= 5).ShouldOnlyContainInOrder(5, 6, 7, 8, 9, 10);

collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray());
probe.ExpectComplete();
await probe.ExpectCompleteAsync();
}, Materializer);
}

Expand All @@ -109,7 +109,7 @@ public void A_Merge_must_work_with_one_way_merge()
}

[Fact]
public void A_Merge_must_work_with_n_way_merge()
public async Task A_Merge_must_work_with_n_way_merge()
{
var source1 = Source.Single(1);
var source2 = Source.Single(2);
Expand All @@ -135,76 +135,71 @@ public void A_Merge_must_work_with_n_way_merge()
return ClosedShape.Instance;
})).Run(Materializer);

var subscription = probe.ExpectSubscription();
var subscription = await probe.ExpectSubscriptionAsync();

var collected = new List<int>();
for (var i = 1; i <= 5; i++)
{
subscription.Request(1);
collected.Add(probe.ExpectNext());
collected.Add(await probe.ExpectNextAsync());
}

collected.Should().BeEquivalentTo(Enumerable.Range(1, 5));
probe.ExpectComplete();
await probe.ExpectCompleteAsync();
}

[Fact]
public void A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
subscription1.Request(4);
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();

var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
var subscription2 = subscriber2.ExpectSubscription();
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
subscription2.Request(4);
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
subscription1.Request(4);
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();

var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher<int>());
var subscription2 = subscriber2.ExpectSubscription();
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
subscription2.Request(4);
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
}, Materializer);
}

[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
public void A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{

await this.AssertAllStagesStoppedAsync(() => {
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
public void A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
public async Task A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{

await this.AssertAllStagesStoppedAsync(() => {
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Merge_must_pass_along_early_cancellation()
public async Task A_Merge_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var up1 = this.CreateManualPublisherProbe<int>();
var up2 = this.CreateManualPublisherProbe<int>();
var down = this.CreateManualSubscriberProbe<int>();
Expand All @@ -224,14 +219,14 @@ public void A_Merge_must_pass_along_early_cancellation()
return ClosedShape.Instance;
})).Run(Materializer);

var downstream = down.ExpectSubscription();
var downstream = await down.ExpectSubscriptionAsync();
downstream.Cancel();
up1.Subscribe(t.Item1);
up2.Subscribe(t.Item2);
var upSub1 = up1.ExpectSubscription();
upSub1.ExpectCancellation();
var upSub2 = up2.ExpectSubscription();
upSub2.ExpectCancellation();
var upSub1 = await up1.ExpectSubscriptionAsync();
await upSub1.ExpectCancellationAsync();
var upSub2 = await up2.ExpectSubscriptionAsync();
await upSub2.ExpectCancellationAsync();
}, Materializer);
}
}
Expand Down

0 comments on commit d925fd5

Please sign in to comment.