From 924bb1546c161286f0a61b09a95cf825273be728 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Mon, 25 Apr 2022 19:42:47 +0200 Subject: [PATCH] GroupBy fixes (#5874) * Target incrementalist against v1.4 branch for v1.4 * Update MNTR to 1.1.1 and update build script to suit (#5867) (cherry picked from commit 2b4267ea9d837af8a01e5fd68570d5479dc0eaaa) * GroupBy pulls upstream when a substream materialization is waiting * Cancel GroupBy when all substreams cancel * Allow GroupBy to recreate already closed substreams * Fixes GroupBy does not invoke decider * Avoids memory being retained for GroupBy * Revert v1.4 merge * Fix markdownlint error Co-authored-by: Aaron Stannard Co-authored-by: Gregorius Soedharmo --- docs/articles/streams/builtinstages.md | 12 +- .../CoreAPISpec.ApproveStreams.verified.txt | 5 + .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 272 ++++++++++++++---- src/core/Akka.Streams/Dsl/FlowOperations.cs | 83 ++++-- .../Dsl/Internal/InternalFlowOperations.cs | 99 +++++-- .../Implementation/Fusing/StreamOfStreams.cs | 89 +++--- .../TooManySubstreamsOpenException.cs | 22 ++ 7 files changed, 426 insertions(+), 156 deletions(-) create mode 100644 src/core/Akka.Streams/TooManySubstreamsOpenException.cs diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index 8c827766fa9..5ac675b47b1 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -1000,7 +1000,17 @@ and returns a pair containing a strict sequence of the taken element and a strea ### GroupBy -De-multiplex the incoming stream into separate output streams. +This operation demultiplexes the incoming stream into separate output streams, one for each element key. The +key is computed for each element using the given function. When a new key is encountered for the first time +a new substream is opened and subsequently fed with all elements belonging to that key. + + +> [!NOTE] +> If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost. + +> [!WARNING] +> If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + **emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt index aca25f5b718..f5ea9a0ead2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt @@ -1012,6 +1012,10 @@ namespace Akka.Streams Shaping = 0, Enforcing = 1, } + public class TooManySubstreamsOpenException : System.InvalidOperationException + { + public TooManySubstreamsOpenException() { } + } public abstract class TransformerLikeBase : Akka.Streams.ITransformerLike { protected TransformerLikeBase() { } @@ -1342,6 +1346,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow DivertTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that, System.Func when) { } public static Akka.Streams.Dsl.Flow DivertToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Flow Expand(this Akka.Streams.Dsl.Flow flow, System.Func> extrapolate) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 2609e557762..5891e573814 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -38,63 +38,6 @@ public FlowGroupBySpec(ITestOutputHelper helper) : base(helper) Materializer = ActorMaterializer.Create(Sys, settings); } - private sealed class StreamPuppet - { - private readonly TestSubscriber.ManualProbe _probe; - private readonly ISubscription _subscription; - - public StreamPuppet(IPublisher p, TestKitBase kit) - { - _probe = kit.CreateManualSubscriberProbe(); - p.Subscribe(_probe); - _subscription = _probe.ExpectSubscription(); - } - - public void Request(int demand) => _subscription.Request(demand); - - public void ExpectNext(int element) => _probe.ExpectNext(element); - - public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); - - public void ExpectComplete() => _probe.ExpectComplete(); - - public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); - - public void Cancel() => _subscription.Cancel(); - } - - private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, - Action)>, ISubscription, Func>> run = null) - { - - var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); - var max = maxSubstream > 0 ? maxSubstream : groupCount; - var groupStream = - Source.FromPublisher(source) - .GroupBy(max, x => x % groupCount) - .Lift(x => x % groupCount) - .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); - var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); - - groupStream.Subscribe(masterSubscriber); - var masterSubscription = masterSubscriber.ExpectSubscription(); - - run?.Invoke(masterSubscriber, masterSubscription, expectedKey => - { - masterSubscription.Request(1); - var tuple = masterSubscriber.ExpectNext(); - tuple.Item1.Should().Be(expectedKey); - return tuple.Item2; - }); - } - - private ByteString RandomByteString(int size) - { - var a = new byte[size]; - ThreadLocalRandom.Current.NextBytes(a); - return ByteString.FromBytes(a); - } - [Fact] public void GroupBy_must_work_in_the_happy_case() { @@ -167,11 +110,11 @@ public void GroupBy_must_fail_when_key_function_returns_null() } [Fact] - public void GroupBy_must_support_cancelling_substreams() + public void GroupBy_must_accept_cancelling_substreams() { this.AssertAllStagesStopped(() => { - WithSubstreamsSupport(2, run: (masterSubscriber, masterSubscription, getSubFlow) => + WithSubstreamsSupport(2, maxSubstream: 3, run: (masterSubscriber, masterSubscription, getSubFlow) => { new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher(false), Materializer), this).Cancel(); var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher(false), Materializer), this); @@ -427,6 +370,20 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams() }, Materializer); } + [Fact] + public void GroupBy_must_resume_when_exceeding_maxSubstreams() + { + var f = Flow.Create().GroupBy(0, x => x).MergeSubstreams(); + var (up, down) = ((Flow)f) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SourceProbe(), this.SinkProbe(), Materializer); + + down.Request(1); + + up.SendNext(1); + down.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + [Fact] public void GroupBy_must_emit_subscribe_before_completed() { @@ -483,7 +440,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test() } [Fact] - public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main() + public void GroupBy_must_work_if_pull_is_exercised_from_both_substream_and_main() { this.AssertAllStagesStopped(() => { @@ -517,6 +474,142 @@ public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main( }, Materializer); } + [Fact] + public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_while_downstream_is_backpressuring() + { + this.AssertAllStagesStopped(() => + { + var upstream = this.CreatePublisherProbe(); + var downstreamMaster = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .Via(new GroupBy(10, element => element)) + .RunWith(Sink.FromSubscriber(downstreamMaster), Materializer); + + var substream1 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer); + + var substream2 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(2); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer); + + substream1.Request(1); + substream1.ExpectNext(1); + substream2.Request(1); + substream2.ExpectNext(2); + + // Both substreams pull + substream1.Request(1); + substream2.Request(1); + + // Upstream sends new groups + upstream.SendNext(3); + upstream.SendNext(4); + + var substream3 = this.CreateSubscriberProbe(); + var substream4 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream3), Materializer); + downstreamMaster.Request(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream4), Materializer); + + substream3.Request(1); + substream3.ExpectNext(3); + substream4.Request(1); + substream4.ExpectNext(4); + + // Cleanup, not part of the actual test + substream1.Cancel(); + substream2.Cancel(); + substream3.Cancel(); + substream4.Cancel(); + downstreamMaster.Cancel(); + upstream.SendComplete(); + }, Materializer); + } + + [Fact] + public void GroupBy_must_allow_to_recreate_an_already_closed_substream() + { + this.AssertAllStagesStopped(() => + { + var f = Flow.Create() + .GroupBy(2, x => x, allowClosedSubstreamRecreation: true) + .Take(1) // close the substream after 1 element + .MergeSubstreams(); + + var (up, down) = ((Flow)f) + .RunWith(this.SourceProbe(), this.SinkProbe(), Materializer); + + down.Request(4); + + // Creates and closes substream "1" + up.SendNext(1); + down.ExpectNext(1); + + // Creates and closes substream "2" + up.SendNext(2); + down.ExpectNext(2); + + // Recreates and closes substream "1" twice + up.SendNext(1); + down.ExpectNext(1); + up.SendNext(1); + down.ExpectNext(1); + + // Cleanup, not part of the actual test + up.SendComplete(); + down.ExpectComplete(); + }, Materializer); + } + + [Fact] + public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel() + { + this.AssertAllStagesStopped(() => + { + var upstream = this.CreatePublisherProbe(); + var downstreamMaster = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .Via(new GroupBy(10, element => element)) + .RunWith(Sink.FromSubscriber(downstreamMaster), Materializer); + + var substream1 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer); + + var substream2 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(2); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer); + + // Cancel downstream + downstreamMaster.Cancel(); + + // Both substreams still work + substream1.Request(1); + substream1.ExpectNext(1); + substream2.Request(1); + substream2.ExpectNext(2); + + // New keys are ignored + upstream.SendNext(3); + upstream.SendNext(4); + + // Cancel all substreams + substream1.Cancel(); + substream2.Cancel(); + + // Upstream gets cancelled + upstream.ExpectCancellation(); + }, Materializer); + } + [Fact] public void GroupBy_must_work_with_random_demand() { @@ -590,6 +683,63 @@ public void GroupBy_must_work_with_random_demand() }, Materializer); } + private sealed class StreamPuppet + { + private readonly TestSubscriber.ManualProbe _probe; + private readonly ISubscription _subscription; + + public StreamPuppet(IPublisher p, TestKitBase kit) + { + _probe = kit.CreateManualSubscriberProbe(); + p.Subscribe(_probe); + _subscription = _probe.ExpectSubscription(); + } + + public void Request(int demand) => _subscription.Request(demand); + + public void ExpectNext(int element) => _probe.ExpectNext(element); + + public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); + + public void ExpectComplete() => _probe.ExpectComplete(); + + public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); + + public void Cancel() => _subscription.Cancel(); + } + + private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, + Action)>, ISubscription, Func>> run = null) + { + + var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); + var max = maxSubstream > 0 ? maxSubstream : groupCount; + var groupStream = + Source.FromPublisher(source) + .GroupBy(max, x => x % groupCount) + .Lift(x => x % groupCount) + .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); + var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); + + groupStream.Subscribe(masterSubscriber); + var masterSubscription = masterSubscriber.ExpectSubscription(); + + run?.Invoke(masterSubscriber, masterSubscription, expectedKey => + { + masterSubscription.Request(1); + var tuple = masterSubscriber.ExpectNext(); + tuple.Item1.Should().Be(expectedKey); + return tuple.Item2; + }); + } + + private ByteString RandomByteString(int size) + { + var a = new byte[size]; + ThreadLocalRandom.Current.NextBytes(a); + return ByteString.FromBytes(a); + } + private sealed class SubFlowState { public SubFlowState(TestSubscriber.Probe probe, bool hasDemand, ByteString firstElement) @@ -681,7 +831,7 @@ private void RandomDemand(Dictionary map, RandomDemandPropert map[key] = new SubFlowState(state.Probe, false, null); } else if (props.BlockingNextElement == null) - break; + break; } } diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index b8359b7beea..32fe8a69631 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -1315,48 +1315,75 @@ public static Flow Transform(this Flo /// This operation demultiplexes the incoming stream into separate output /// streams, one for each element key. The key is computed for each element /// using the given function. When a new key is encountered for the first time - /// it is emitted to the downstream subscriber together with a fresh - /// flow that will eventually produce all the elements of the substream - /// for that key. Not consuming the elements from the created streams will - /// stop this processor from processing more elements, therefore you must take - /// care to unblock (or cancel) all of the produced streams even if you want - /// to consume only one of them. - /// + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: If is set to false (default behavior) the operator + /// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this + /// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + /// + /// + /// Note: If is set to true substream completion and incoming + /// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing + /// these elements might get lost. + /// + /// + /// The object returned from this method is not a normal , it is a + /// . This means that after this operator + /// all transformations are applied to all encountered substreams in the same fashion. + /// Substream mode is exited either by closing the substream (i.e. connecting it to a ) + /// or by merging the substreams back together; see the To and MergeBack methods + /// on for more information. + /// + /// + /// It is important to note that the substreams also propagate back-pressure as any other stream, which means + /// that blocking one substream will block the GroupBy operator itself —and thereby all substreams— once all + /// internal or explicit buffers are filled. + /// + /// /// If the group by function throws an exception and the supervision decision - /// is the stream and substreams will be completed - /// with failure. - /// + /// is the stream and substreams will be completed with failure. + /// + /// /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. - /// - /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. - /// - /// Adheres to the attribute. - /// - /// - /// Emits when an element for which the grouping function returns a group that has not yet been created. - /// Emits the new group /// - /// Backpressures when there is an element pending for a group whose substream backpressures /// - /// Completes when upstream completes + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// - /// Cancels when downstream cancels and all substreams cancel + /// **Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group. + /// **Backpressures when** there is an element pending for a group whose substream backpressures + /// **Completes when** upstream completes + /// **Cancels when** downstream cancels and all substreams cancel /// /// TBD /// TBD /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Computes the key for each element + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD - public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) - { - return flow.GroupBy(maxSubstreams, groupingFunc, - (f, s) => ((Flow, TMat>) f).To(s)); - } + public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation) => + flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), allowClosedSubstreamRecreation); + + /// + /// This operation demultiplexes the incoming stream into separate output + /// streams, one for each element key. The key is computed for each element + /// using the given function. When a new key is encountered for the first time + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: The stage keeps track of all keys of streams that have already been closed. + /// If you expect an infinite number of keys this can cause memory issues. Elements belonging + /// to those keys are drained directly and not send to the substream. + /// + /// See + /// + public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) => + flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), false); /// /// This operation applies the given predicate to all incoming elements and diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 70900d5d5c2..e0224cb6e41 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -1269,59 +1269,103 @@ public static IFlow Transform(this IFlow /// This operation demultiplexes the incoming stream into separate output /// streams, one for each element key. The key is computed for each element /// using the given function. When a new key is encountered for the first time - /// it is emitted to the downstream subscriber together with a fresh - /// flow that will eventually produce all the elements of the substream - /// for that key. Not consuming the elements from the created streams will - /// stop this processor from processing more elements, therefore you must take - /// care to unblock (or cancel) all of the produced streams even if you want - /// to consume only one of them. - /// + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: If is set to false (default behavior) the operator + /// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this + /// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + /// + /// + /// Note: If is set to true substream completion and incoming + /// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing + /// these elements might get lost. + /// + /// + /// The object returned from this method is not a normal or , it is a + /// . This means that after this operator + /// all transformations are applied to all encountered substreams in the same fashion. + /// Substream mode is exited either by closing the substream (i.e. connecting it to a ) + /// or by merging the substreams back together; see the To and MergeBack methods + /// on for more information. + /// + /// + /// It is important to note that the substreams also propagate back-pressure as any other stream, which means + /// that blocking one substream will block the GroupBy operator itself —and thereby all substreams— once all + /// internal or explicit buffers are filled. + /// + /// /// If the group by function throws an exception and the supervision decision - /// is the stream and substreams will be completed - /// with failure. - /// + /// is the stream and substreams will be completed with failure. + /// + /// /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. - /// - /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. - /// - /// Emits when an element for which the grouping function returns a group that has not yet been created. - /// Emits the new group /// - /// Backpressures when there is an element pending for a group whose substream backpressures /// - /// Completes when upstream completes + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// - /// Cancels when downstream cancels and all substreams cancel + /// **Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group. + /// **Backpressures when** there is an element pending for a group whose substream backpressures + /// **Completes when** upstream completes + /// **Cancels when** downstream cancels and all substreams cancel /// /// TBD /// TBD /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Computes the key for each element /// TBD + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD public static SubFlow GroupBy( this IFlow flow, int maxSubstreams, Func groupingFunc, - Func, TMat>, Sink, Task>, TClosed> toFunc) + Func, TMat>, Sink, Task>, TClosed> toFunc, + bool allowClosedSubstreamRecreation) { - var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc); + var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation); Func, TClosed> finish = s => { return toFunc( - flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc)), + flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); }; return new SubFlowImpl(Flow.Create(), merge, finish); } + /// + /// This operation demultiplexes the incoming stream into separate output streams, one for each element key. + /// The key is computed for each element using the given function. When a new key is encountered for the first + /// time a new substream is opened and subsequently fed with all elements belonging to that key. + /// + /// WARNING: The operator keeps track of all keys of streams that have already been closed. If you expect an + /// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly + /// and not send to the substream. + /// + /// See also + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public static SubFlow GroupBy( + this IFlow flow, + int maxSubstreams, + Func groupingFunc, + Func, TMat>, Sink, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false); + /// /// TBD /// @@ -1333,6 +1377,7 @@ internal class GroupByMergeBack : IMergeBack private readonly IFlow _self; private readonly int _maxSubstreams; private readonly Func _groupingFunc; + private readonly bool _allowClosedSubstreamRecreation; /// /// TBD @@ -1340,13 +1385,13 @@ internal class GroupByMergeBack : IMergeBack /// TBD /// TBD /// TBD - public GroupByMergeBack(IFlow self, - int maxSubstreams, - Func groupingFunc) + /// TBD + public GroupByMergeBack(IFlow self, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation = false) { _self = self; _maxSubstreams = maxSubstreams; _groupingFunc = groupingFunc; + _allowClosedSubstreamRecreation = allowClosedSubstreamRecreation; } /// @@ -1358,7 +1403,7 @@ public GroupByMergeBack(IFlow self, /// TBD public IFlow Apply(Flow flow, int breadth) { - return _self.Via(new Fusing.GroupBy(_maxSubstreams, _groupingFunc)) + return _self.Via(new Fusing.GroupBy(_maxSubstreams, _groupingFunc, _allowClosedSubstreamRecreation)) .Select(f => f.Via(flow)) .Via(new Fusing.FlattenMerge, T, NotUsed>(breadth)); } diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 87d08789836..eb620ce9d1f 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -15,7 +15,6 @@ using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; using Akka.Streams.Supervision; -using Akka.Streams.Util; using Akka.Util; using Akka.Util.Internal; @@ -181,7 +180,7 @@ public FlattenMerge(int breadth) internal sealed class PrefixAndTail : GraphStage, Source)>> { #region internal classes - + private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler { private const string SubscriptionTimer = "SubstreamSubscriptionTimer"; @@ -205,11 +204,11 @@ public Logic(PrefixAndTail stage) : base(stage.Shape) Pull(_stage._in); _tailSource.SetHandler(new LambdaOutHandler(onPull: () => Pull(_stage._in))); }); - + SetHandler(_stage._in, this); SetHandler(_stage._out, this); } - + protected internal override void OnTimer(object timerKey) { var materializer = ActorMaterializerHelper.Downcast(Interpreter.Materializer); @@ -360,7 +359,7 @@ public PrefixAndTail(int count) /// TBD internal sealed class GroupBy : GraphStage>> { - #region Loigc + #region Logic private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler { @@ -370,7 +369,7 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler private readonly HashSet _substreamsJustStarted = new HashSet(); private readonly Lazy _decider; private TimeSpan _timeout; - private SubstreamSource _substreamWaitingToBePushed; + private Option _substreamWaitingToBePushed = Option.None; private Option _nextElementKey = Option.None; private Option _nextElementValue = Option.None; private long _nextId; @@ -379,12 +378,12 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler public Logic(GroupBy stage, Attributes inheritedAttributes) : base(stage.Shape) { _stage = stage; - + _decider = new Lazy(() => { var attribute = inheritedAttributes.GetAttribute(null); return attribute != null ? attribute.Decider : Deciders.StoppingDecider; - }); + }); SetHandler(_stage.In, this); SetHandler(_stage.Out, this); @@ -411,8 +410,8 @@ public void OnPush() } else { - if (_activeSubstreams.Count == _stage._maxSubstreams) - Fail(new IllegalStateException($"Cannot open substream for key {key}: too many substreams open")); + if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams) + throw new TooManySubstreamsOpenException(); else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In); else @@ -431,11 +430,12 @@ public void OnPush() public void OnPull() { - if (_substreamWaitingToBePushed != null) + if (_substreamWaitingToBePushed.HasValue) { - Push(_stage.Out, Source.FromGraph(_substreamWaitingToBePushed.Source)); - ScheduleOnce(_substreamWaitingToBePushed.Key.Value, _timeout); - _substreamWaitingToBePushed = null; + var substreamSource = _substreamWaitingToBePushed.Value; + Push(_stage.Out, Source.FromGraph(substreamSource.Source)); + ScheduleOnce(substreamSource.Key.Value, _timeout); + _substreamWaitingToBePushed = Option.None; } else { @@ -463,9 +463,7 @@ public void OnUpstreamFinish() public void OnDownstreamFinish() { - if (_activeSubstreams.Count == 0) - CompleteStage(); - else + if (!TryCancel()) SetKeepGoing(true); } @@ -492,6 +490,18 @@ private bool TryCompleteAll() return false; } + private bool TryCancel() + { + // if there's no active substreams or there's only one but it's not been pushed yet + if (_activeSubstreams.Count == 0 || (_activeSubstreams.Count == 1 && _substreamWaitingToBePushed.HasValue)) + { + CompleteStage(); + return true; + } + + return false; + } + private void Fail(Exception ex) { foreach (var value in _activeSubstreams.Values) @@ -500,7 +510,7 @@ private void Fail(Exception ex) FailStage(ex); } - private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement); + private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement || _substreamWaitingToBePushed.HasValue); public override void PreStart() { @@ -510,14 +520,15 @@ public override void PreStart() protected internal override void OnTimer(object timerKey) { - var key = (TKey) timerKey; - if (_activeSubstreams.TryGetValue(key, out var substreamSource)) + var key = (TKey)timerKey; + if (_activeSubstreams.ContainsKey(key)) { - substreamSource.Timeout(_timeout); - _closedSubstreams.Add(key); + if (!_stage._allowClosedSubstreamRecreation) + { + _closedSubstreams.Add(key); + } _activeSubstreams.Remove(key); - if (IsClosed(_stage.In)) - TryCompleteAll(); + if (IsClosed(_stage.In)) TryCompleteAll(); } } @@ -530,7 +541,7 @@ private void RunSubstream(TKey key, T value) { Push(_stage.Out, Source.FromGraph(substreamSource.Source)); ScheduleOnce(key, _timeout); - _substreamWaitingToBePushed = null; + _substreamWaitingToBePushed = Option.None; } else { @@ -564,7 +575,7 @@ private void CompleteSubStream() { Complete(); _logic._activeSubstreams.Remove(Key.Value); - _logic._closedSubstreams.Add(Key.Value); + if (!_logic._stage._allowClosedSubstreamRecreation) _logic._closedSubstreams.Add(Key.Value); } private void TryCompleteHandler() @@ -601,15 +612,12 @@ public void OnPull() public void OnDownstreamFinish() { - if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) - _logic.ClearNextElement(); - if (FirstPush) - _logic._firstPushCounter--; + if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) _logic.ClearNextElement(); + if (FirstPush) _logic._firstPushCounter--; CompleteSubStream(); - if (_logic.IsClosed(_logic._stage.In)) - _logic.TryCompleteAll(); - else if (_logic.NeedToPull) - _logic.Pull(_logic._stage.In); + if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel(); + if (_logic.IsClosed(_logic._stage.In)) _logic.TryCompleteAll(); + else if (_logic.NeedToPull) _logic.Pull(_logic._stage.In); } } } @@ -618,17 +626,20 @@ public void OnDownstreamFinish() private readonly int _maxSubstreams; private readonly Func _keyFor; + private readonly bool _allowClosedSubstreamRecreation; /// /// TBD /// /// TBD /// TBD - public GroupBy(int maxSubstreams, Func keyFor) + /// TBD + public GroupBy(int maxSubstreams, Func keyFor, bool allowClosedSubstreamRecreation = false) { _maxSubstreams = maxSubstreams; _keyFor = keyFor; - + _allowClosedSubstreamRecreation = allowClosedSubstreamRecreation; + Shape = new FlowShape>(In, Out); } @@ -778,7 +789,7 @@ public override void OnDownstreamFinish() else // Start draining if (!_logic.HasBeenPulled(_inlet)) - _logic.Pull(_inlet); + _logic.Pull(_inlet); } public override void OnPush() @@ -1010,7 +1021,7 @@ protected CommandScheduledBeforeMaterialization(ICommand command) internal class RequestOneScheduledBeforeMaterialization : CommandScheduledBeforeMaterialization { public static readonly RequestOneScheduledBeforeMaterialization Instance = new RequestOneScheduledBeforeMaterialization(RequestOne.Instance); - + private RequestOneScheduledBeforeMaterialization(ICommand command) : base(command) { } @@ -1046,7 +1057,7 @@ private RequestOne() { } } - + internal class Cancel : ICommand { public static readonly Cancel Instance = new Cancel(); diff --git a/src/core/Akka.Streams/TooManySubstreamsOpenException.cs b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs new file mode 100644 index 00000000000..cbb9ab20f65 --- /dev/null +++ b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs @@ -0,0 +1,22 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; + +namespace Akka.Streams +{ + /// + /// This exception signals that the maximum number of substreams declared has been exceeded. + /// A finite limit is imposed so that memory usage is controlled. + /// + public class TooManySubstreamsOpenException : InvalidOperationException + { + public TooManySubstreamsOpenException() : + base("Cannot open a new substream as there are too many substreams open") + { } + } +}