Skip to content

Commit

Permalink
Fix Streams.Tests.Dsl.MergeHub_must_work_with_long_streams_when_buffe…
Browse files Browse the repository at this point in the history
…r_size_is_1 (#6054)
  • Loading branch information
Arkatufus authored Jul 28, 2022
1 parent 411c800 commit f571e26
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/HubSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
using FluentAssertions;
using Xunit;
using Akka.Actor;
using Akka.Streams.Dsl.Internal;
using Akka.Streams.Tests.Actor;
using Akka.TestKit.Extensions;
using Akka.TestKit.Xunit2.Attributes;
using Akka.Util.Internal;
Expand Down Expand Up @@ -183,17 +185,25 @@ public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var (sink, probe) = MergeHub.Source<int>(1).Take(20000)
var (sink, probe) = MergeHub.Source<int>(1)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);

Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer);
Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer);

await probe.RequestAsync(int.MaxValue);
var result = await probe.ExpectNextNAsync(20000, 300.Seconds()).ToListAsync();
var result = new List<int>();
foreach (var i in Enumerable.Range(1, 20000))
{
var evt = await probe.ExpectEventAsync();
if (evt is TestSubscriber.OnNext<int> next)
result.Add(next.Element);
else
throw new Exception($"For element [{i}]: Expected OnNext<int> but received {evt.GetType()}");
}
result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000));
}, Materializer);
}, Materializer, 300.Seconds());
}

[Fact]
Expand Down

0 comments on commit f571e26

Please sign in to comment.