Skip to content

Commit

Permalink
Fixes Sink.Ignore signature from Task to Task<Done>
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed May 28, 2022
1 parent 34c830a commit b8ca2f8
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4221,13 +4221,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Docs.Tests/Streams/HubsDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void Hubs_must_demonstrate_creating_a_dynamic_merge()

#region merge-hub
// A simple consumer that will print to the console for now
Sink<string, Task> consumer = Sink.ForEach<string>(WriteLine);
Sink<string, Task<Done>> consumer = Sink.ForEach<string>(WriteLine);

// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Docs.Tests/Streams/StreamRefsDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public DataReceiver()
});
}

private Sink<string, Task> LogsSinksFor(string id) =>
private Sink<string, Task<Done>> LogsSinksFor(string id) =>
Sink.ForEach<string>(Console.WriteLine);
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void A_Graph_with_materialized_value_must_perform_side_effecting_transfor
});
var r = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<int>(), (b, sink) =>
{
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(0));
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
b.Add(g);
b.From(source).To(sink);
return ClosedShape.Instance;
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void Zip_must_complete_if_one_side_is_available_but_other_already_complet
var completed = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<(int, string)>(), (b, sink) =>
{
var zip = b.Add(new Zip<int, string>());
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue<Task>(_ => null);
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue<Task>(_ => null);
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue(_ => Task.FromResult(Done.Instance));

b.From(source1).To(zip.In0);
b.From(source2).To(zip.In1);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ private async Task ValidateServerClientCommunicationAsync(ByteString testData, S
(await serverConnection.WaitReadAsync()).Should().BeEquivalentTo(testData);
}

private Sink<Tcp.IncomingConnection, Task> EchoHandler() =>
private Sink<Tcp.IncomingConnection, Task<Done>> EchoHandler() =>
Sink.ForEach<Tcp.IncomingConnection>(c => c.Flow.Join(Flow.Create<ByteString>()).Run(Materializer));

[Fact]
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,17 +1325,17 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc,
bool allowClosedSubstreamRecreation)
{
var merge = new GroupByMergeBack<T, TMat, TKey>(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(
flow.Via(new Fusing.GroupBy<T, TKey>(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand All @@ -1349,7 +1349,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
/// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly
/// and not send to the substream.
/// </para>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task}, TClosed}, bool)"/>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task{Done}}, TClosed}, bool)"/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TMat"></typeparam>
Expand All @@ -1364,7 +1364,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);

/// <summary>
/// TBD
Expand Down Expand Up @@ -1472,15 +1472,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitWhen<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitWhenMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.When(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down Expand Up @@ -1577,15 +1577,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitAfter<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitAfterMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.After(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <returns>TBD</returns>
public static Sink<TIn, Task> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());
public static Sink<TIn, Task<Done>> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());

/// <summary>
/// A <see cref="Sink{TIn,TMat}"/> that will invoke the given <paramref name="action"/> for each received element.
Expand All @@ -306,7 +306,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// <typeparam name="TIn">TBD</typeparam>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
.Select(input =>
{
action(input);
Expand Down Expand Up @@ -357,7 +357,7 @@ public static Sink<TIn, NotUsed> Combine<TIn, TOut, TMat>(Func<int, IGraph<Unifo
/// <param name="parallelism">TBD</param>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
.SelectAsyncUnordered(parallelism, input => Task.Run(() =>
{
action(input);
Expand Down
Loading

0 comments on commit b8ca2f8

Please sign in to comment.