Skip to content

Commit

Permalink
Convert Akka.Streams.Tests to async - Dsl.LastSinkSpec (#5990)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Jun 10, 2022
1 parent daa2491 commit d8e62f6
Showing 1 changed file with 43 additions and 31 deletions.
74 changes: 43 additions & 31 deletions src/core/Akka.Streams.Tests/Dsl/LastSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit.Extensions;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -26,75 +30,83 @@ public LastSinkSpec(ITestOutputHelper helper):base(helper)
}

[Fact]
public void A_Flow_with_Sink_Last_must_yield_the_last_value()
public async Task A_Flow_with_Sink_Last_must_yield_the_last_value()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var task = Source.From(Enumerable.Range(1,42)).Select(x=>x).RunWith(Sink.Last<int>(), Materializer);
task.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
task.Result.Should().Be(42);
var result = await Source.From(Enumerable.Range(1,42)).Select(x=>x)
.RunWith(Sink.Last<int>(), Materializer)
.ShouldCompleteWithin(1.Seconds());
result.Should().Be(42);
}, Materializer);
}

[Fact]
public void A_Flow_with_Sink_Last_must_yield_the_first_error()
public async Task A_Flow_with_Sink_Last_must_yield_the_first_error()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
Source.Failed<int>(new Exception("ex"))
.Invoking(s => s.RunWith(Sink.Last<int>(), Materializer).Wait(TimeSpan.FromSeconds(1)))
.Should().Throw<AggregateException>()
(await Awaiting(() =>
Source.Failed<int>(new Exception("ex"))
.RunWith(Sink.Last<int>(), Materializer))
.Should().ThrowAsync<AggregateException>()
.ShouldCompleteWithin(1.Seconds()))
.WithInnerException<Exception>()
.WithMessage("ex");
}, Materializer);
}

[Fact]
public void A_Flow_with_Sink_Last_must_yield_NoSuchElementException_for_empty_stream()
public async Task A_Flow_with_Sink_Last_must_yield_NoSuchElementException_for_empty_stream()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
Source.Empty<int>()
.Invoking(s => s.RunWith(Sink.Last<int>(), Materializer).Wait(TimeSpan.FromSeconds(1)))
.Should().Throw<AggregateException>()
(await Awaiting(() =>
Source.Empty<int>()
.RunWith(Sink.Last<int>(), Materializer))
.Should().ThrowAsync<AggregateException>()
.ShouldCompleteWithin(1.Seconds()))
.WithInnerException<NoSuchElementException>()
.WithMessage("Last of empty stream");
}, Materializer);
}


[Fact]
public void A_Flow_with_Sink_LastOption_must_yield_the_last_value()
public async Task A_Flow_with_Sink_LastOption_must_yield_the_last_value()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var task = Source.From(Enumerable.Range(1, 42)).Select(x => x).RunWith(Sink.LastOrDefault<int>(), Materializer);
task.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
task.Result.Should().Be(42);
var result = await Source.From(Enumerable.Range(1, 42)).Select(x => x)
.RunWith(Sink.LastOrDefault<int>(), Materializer)
.ShouldCompleteWithin(1.Seconds());
result.Should().Be(42);
}, Materializer);
}

[Fact]
public void A_Flow_with_Sink_LastOption_must_yield_the_first_error()
public async Task A_Flow_with_Sink_LastOption_must_yield_the_first_error()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
Source.Failed<int>(new Exception("ex"))
.Invoking(s => s.RunWith(Sink.LastOrDefault<int>(), Materializer).Wait(TimeSpan.FromSeconds(1)))
.Should().Throw<AggregateException>()
(await Awaiting(async () =>
Source.Failed<int>(new Exception("ex"))
.RunWith(Sink.LastOrDefault<int>(), Materializer))
.Should().ThrowAsync<AggregateException>()
.ShouldCompleteWithin(1.Seconds()))
.WithInnerException<Exception>()
.WithMessage("ex");
}, Materializer);
}

[Fact]
public void A_Flow_with_Sink_LastOption_must_yield_default_for_empty_stream()
public async Task A_Flow_with_Sink_LastOption_must_yield_default_for_empty_stream()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var task = Source.Empty<int>().RunWith(Sink.LastOrDefault<int>(), Materializer);
task.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
task.Result.Should().Be(0);
var result = await Source.Empty<int>()
.RunWith(Sink.LastOrDefault<int>(), Materializer)
.ShouldCompleteWithin(1.Seconds());
result.Should().Be(0);
}, Materializer);
}
}
Expand Down

0 comments on commit d8e62f6

Please sign in to comment.