Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed QueueSource PostStop and added a missing test case #4991

Merged
merged 2 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,10 @@ namespace Akka.Streams
public override bool Equals(object obj) { }
public override int GetHashCode() { }
}
public class StreamDetachedException : System.Exception
{
public static readonly Akka.Streams.StreamDetachedException Instance;
}
public class StreamLimitReachedException : System.Exception
{
public StreamLimitReachedException(long max) { }
Expand Down
8 changes: 3 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void QueueSink_should_timeout_future_when_stream_cannot_provide_data()
}, _materializer);
}

[Fact(Skip = "Racy, see https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")]
[Fact]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if this was fixed too

public void QueueSink_should_fail_pull_future_when_stream_is_completed()
{
this.AssertAllStagesStopped(() =>
Expand All @@ -170,10 +170,8 @@ public void QueueSink_should_fail_pull_future_when_stream_is_completed()
var result = queue.PullAsync().Result;
result.Should().Be(Option<int>.None);

((Task)queue.PullAsync()).ContinueWith(t =>
{
t.Exception.InnerException.Should().BeOfType<IllegalStateException>();
}, TaskContinuationOptions.OnlyOnFaulted).Wait();
var exception = Record.ExceptionAsync(async () => await queue.PullAsync()).Result;
exception.Should().BeOfType<StreamDetachedException>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using XUnit API to catch the exception and unwrap. Also, this should always run, not just on OnlyOnFaulted, to make sure we catch regressions.

}, _materializer);
}

Expand Down
23 changes: 20 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,22 @@ public void QueueSource_should_complete_watching_future_with_failure_if_stream_f
}, _materializer);
}

[Fact]
public void QueueSource_should_complete_watching_future_with_failure_if_materializer_shut_down()
{
this.AssertAllStagesStopped(() =>
{
var tempMap = ActorMaterializer.Create(Sys);
var s = this.CreateManualSubscriberProbe<int>();
var queue = Source.Queue<int>(1, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(tempMap);
queue.WatchCompletionAsync().PipeTo(TestActor);
tempMap.Shutdown();
ExpectMsg<Status.Failure>();
}, _materializer);
}

[Fact]
public void QueueSource_should_return_false_when_element_was_not_added_to_buffer()
{
Expand Down Expand Up @@ -322,11 +338,12 @@ public void QueueSource_should_fail_offer_future_when_stream_is_completed()
.Run(_materializer);
var sub = s.ExpectSubscription();

queue.WatchCompletionAsync().ContinueWith(t => "done").PipeTo(TestActor);
queue.WatchCompletionAsync().ContinueWith(t => Done.Instance).PipeTo(TestActor);
sub.Cancel();
ExpectMsg("done");
ExpectMsg(Done.Instance);

queue.OfferAsync(1).ContinueWith(t => t.Exception.Should().BeOfType<IllegalStateException>());
var exception = Record.ExceptionAsync(async () => await queue.OfferAsync(1)).Result;
exception.Should().BeOfType<StreamDetachedException>();
}, _materializer);
}

Expand Down
23 changes: 10 additions & 13 deletions src/core/Akka.Streams/Implementation/Sinks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,10 @@ public override void OnUpstreamFailure(Exception e)

public override void PostStop()
{
if(!_completionSignalled)
if (!_completionSignalled)
_promise.TrySetException(new AbruptStageTerminationException(this));
}

public override void PreStart() => Pull(_stage.In);
}

Expand Down Expand Up @@ -805,12 +805,8 @@ public override void PreStart()
Pull(_stage.In);
}

public override void PostStop()
{
StopCallback(
promise =>
promise.SetException(new IllegalStateException("Stream is terminated. QueueSink is detached")));
}
public override void PostStop() =>
StopCallback(promise => promise.SetException(StreamDetachedException.Instance));

private Action<TaskCompletionSource<Option<T>>> Callback()
{
Expand Down Expand Up @@ -1024,7 +1020,8 @@ private void InitInternalSource(Sink<TIn, TMat> sink, TIn firstElement)
{
var sourceOut = new SubSource(this, firstElement);

try {
try
{
var matVal = Source.FromGraph(sourceOut.Source)
.RunWith(sink, Interpreter.SubFusingMaterializer);
_completion.TrySetResult(matVal);
Expand All @@ -1034,7 +1031,7 @@ private void InitInternalSource(Sink<TIn, TMat> sink, TIn firstElement)
_completion.TrySetException(ex);
FailStage(ex);
}

}

#region SubSource
Expand Down Expand Up @@ -1176,7 +1173,7 @@ public void Dispose(bool unregister)
}
}
}

private sealed class ObservableLogic : GraphStageLogic, IObservable<T>
{
private readonly ObservableSinkStage<T> _stage;
Expand Down Expand Up @@ -1216,12 +1213,12 @@ public void Remove(IObserver<T> observer)
ImmutableInterlocked.TryRemove(ref _observers, observer, out var _);
}

public IDisposable Subscribe(IObserver<T> observer) =>
public IDisposable Subscribe(IObserver<T> observer) =>
ImmutableInterlocked.GetOrAdd(ref _observers, observer, new ObserverDisposable(this, observer));
}

#endregion


public ObservableSinkStage()
{
Expand Down
13 changes: 5 additions & 8 deletions src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Akka.Annotations;
using Akka.Pattern;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Streams.Util;
using Akka.Util;
using Akka.Util.Internal;

Expand Down Expand Up @@ -170,14 +168,13 @@ public override void PreStart()

public override void PostStop()
{
var exception = StreamDetachedException.Instance;
_completion.TrySetException(exception);
StopCallback(input =>
{
var offer = input as Offer<TOut>;
if (offer != null)
{
var promise = offer.CompletionSource;
promise.NonBlockingTrySetException(new IllegalStateException("Stream is terminated. SourceQueue is detached."));
}
if (!(input is Offer<TOut> offer)) return;
var promise = offer.CompletionSource;
promise.NonBlockingTrySetException(exception);
});
}

Expand Down
16 changes: 16 additions & 0 deletions src/core/Akka.Streams/StreamTcpException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ protected StreamTcpException(SerializationInfo info, StreamingContext context) :
#endif
}

/// <summary>
/// This exception signals that materialized value is already detached from stream. This usually happens
/// when stream is completed and an ActorSystem is shut down while materialized object is still available.
/// </summary>
public class StreamDetachedException : Exception
{
/// <summary>
/// Initializes a single instance of the <see cref="StreamDetachedException"/> class.
/// </summary>
public static readonly StreamDetachedException Instance = new StreamDetachedException();

private StreamDetachedException() : base("Stream is terminated. Materialized value is detached.")
{
}
}

/// <summary>
/// TBD
/// </summary>
Expand Down