Skip to content

Commit

Permalink
Fix attempt, AsyncEnumerableSpec was hung (#6049)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Jul 27, 2022
1 parent e9b643f commit 59cda86
Showing 1 changed file with 89 additions and 92 deletions.
181 changes: 89 additions & 92 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
using Xunit.Abstractions;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using Akka.TestKit.Extensions;
using Akka.Util;
using FluentAssertions.Extensions;
using static FluentAssertions.FluentActions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -42,66 +44,50 @@ public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();
await this.AssertAllStagesStoppedAsync(async () =>
{
var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;
var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
try
{
await foreach (var a in asyncEnumerable.WithCancellation(token))
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
await Awaiting(async () =>
{
cts.Cancel();
}
}
catch (OperationCanceledException e)
{
caught = true;
}

caught.ShouldBeTrue();
await foreach (var a in asyncEnumerable.WithCancellation(token))
{
cts.Cancel();
}
}).Should().ThrowAsync<OperationCanceledException>().ShouldCompleteWithin(3.Seconds());
}, Materializer);
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
await this.AssertAllStagesStoppedAsync(async () =>
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}

output.Length.ShouldBe(0, "Did not receive all elements!");
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = await asyncEnumerable.ToListAsync().AsTask().ShouldCompleteWithin(3.Seconds());
output.Should().BeEquivalentTo(input, options => options.WithStrictOrdering());
}, Materializer);
}

[Fact]
public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
await this.AssertAllStagesStoppedAsync(async () =>
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);

output.Length.ShouldBe(0, "Did not receive all elements!");
var output = await asyncEnumerable.ToListAsync().AsTask().ShouldCompleteWithin(3.Seconds());
output.Should().BeEquivalentTo(input, options => options.WithStrictOrdering());

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}

output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
output = await asyncEnumerable.ToListAsync().AsTask().ShouldCompleteWithin(3.Seconds());
output.Should().BeEquivalentTo(input, options => options.WithStrictOrdering());
}, Materializer);
}


Expand All @@ -114,30 +100,31 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()

var a = Task.Run(async () =>
{
await foreach (var notused in task)
await foreach (var _ in task)
{
materializer.Shutdown();
if(!materializer.IsShutdown)
materializer.Shutdown();
}
});
//since we are collapsing the stream inside the read
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
await probe.SendNextAsync(1);
await probe.SendNextAsync(2);
var thrown = false;
try
{
await a;
await a.ShouldCompleteWithin(3.Seconds());
}
catch (StreamDetachedException e)
catch (StreamDetachedException)
{
thrown = true;
}
catch (AbruptTerminationException e)
catch (AbruptTerminationException)
{
thrown = true;
}

thrown.ShouldBeTrue();
thrown.Should().BeTrue();
}

[Fact]
Expand All @@ -148,86 +135,96 @@ public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumer
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
materializer.Shutdown();

async Task ShouldThrow()
await Awaiting(async () =>
{
await foreach (var a in task)
{
}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}).Should().ThrowAsync<IllegalStateException>().ShouldCompleteWithin(3.Seconds());
}

[Fact]
public async Task
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();
await this.AssertAllStagesStoppedAsync(async () =>
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(100);
await subscriber.ExpectCompleteAsync();
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(100);
await subscriber.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_All_Elements()
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();
await this.AssertAllStagesStoppedAsync(async () =>
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 100));
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 100));

await subscriber.ExpectCompleteAsync();
await subscriber.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();
await this.AssertAllStagesStoppedAsync(async () =>
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));

var exception = await subscriber.ExpectErrorAsync();
var exception = await subscriber.ExpectErrorAsync();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
}, Materializer);
}

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();
await this.AssertAllStagesStoppedAsync(async () =>
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
subscription.Cancel();
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
subscription.Cancel();

// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
}, Materializer);
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
Expand Down

0 comments on commit 59cda86

Please sign in to comment.