Skip to content

Commit

Permalink
Fix fail test case in QueueSourceSpec and added StreamDetachedExcepti…
Browse files Browse the repository at this point in the history
…on when stream already completed
  • Loading branch information
ismaelhamed committed May 1, 2021
1 parent 700fa35 commit 331cd3a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
8 changes: 3 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void QueueSink_should_timeout_future_when_stream_cannot_provide_data()
}, _materializer);
}

[Fact(Skip = "Racy, see https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")]
[Fact]
public void QueueSink_should_fail_pull_future_when_stream_is_completed()
{
this.AssertAllStagesStopped(() =>
Expand All @@ -170,10 +170,8 @@ public void QueueSink_should_fail_pull_future_when_stream_is_completed()
var result = queue.PullAsync().Result;
result.Should().Be(Option<int>.None);

((Task)queue.PullAsync()).ContinueWith(t =>
{
t.Exception.InnerException.Should().BeOfType<IllegalStateException>();
}, TaskContinuationOptions.OnlyOnFaulted).Wait();
var exception = Record.ExceptionAsync(async () => await queue.PullAsync()).Result;
exception.Should().BeOfType<StreamDetachedException>();
}, _materializer);
}

Expand Down
7 changes: 4 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,12 @@ public void QueueSource_should_fail_offer_future_when_stream_is_completed()
.Run(_materializer);
var sub = s.ExpectSubscription();

queue.WatchCompletionAsync().ContinueWith(t => "done").PipeTo(TestActor);
queue.WatchCompletionAsync().ContinueWith(t => Done.Instance).PipeTo(TestActor);
sub.Cancel();
ExpectMsg("done");
ExpectMsg(Done.Instance);

queue.OfferAsync(1).ContinueWith(t => t.Exception.Should().BeOfType<IllegalStateException>());
var exception = Record.ExceptionAsync(async () => await queue.OfferAsync(1)).Result;
exception.Should().BeOfType<StreamDetachedException>();
}, _materializer);
}

Expand Down
23 changes: 10 additions & 13 deletions src/core/Akka.Streams/Implementation/Sinks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,10 @@ public override void OnUpstreamFailure(Exception e)

public override void PostStop()
{
if(!_completionSignalled)
if (!_completionSignalled)
_promise.TrySetException(new AbruptStageTerminationException(this));
}

public override void PreStart() => Pull(_stage.In);
}

Expand Down Expand Up @@ -805,12 +805,8 @@ public override void PreStart()
Pull(_stage.In);
}

public override void PostStop()
{
StopCallback(
promise =>
promise.SetException(new IllegalStateException("Stream is terminated. QueueSink is detached")));
}
public override void PostStop() =>
StopCallback(promise => promise.SetException(StreamDetachedException.Instance));

private Action<TaskCompletionSource<Option<T>>> Callback()
{
Expand Down Expand Up @@ -1024,7 +1020,8 @@ private void InitInternalSource(Sink<TIn, TMat> sink, TIn firstElement)
{
var sourceOut = new SubSource(this, firstElement);

try {
try
{
var matVal = Source.FromGraph(sourceOut.Source)
.RunWith(sink, Interpreter.SubFusingMaterializer);
_completion.TrySetResult(matVal);
Expand All @@ -1034,7 +1031,7 @@ private void InitInternalSource(Sink<TIn, TMat> sink, TIn firstElement)
_completion.TrySetException(ex);
FailStage(ex);
}

}

#region SubSource
Expand Down Expand Up @@ -1176,7 +1173,7 @@ public void Dispose(bool unregister)
}
}
}

private sealed class ObservableLogic : GraphStageLogic, IObservable<T>
{
private readonly ObservableSinkStage<T> _stage;
Expand Down Expand Up @@ -1216,12 +1213,12 @@ public void Remove(IObserver<T> observer)
ImmutableInterlocked.TryRemove(ref _observers, observer, out var _);
}

public IDisposable Subscribe(IObserver<T> observer) =>
public IDisposable Subscribe(IObserver<T> observer) =>
ImmutableInterlocked.GetOrAdd(ref _observers, observer, new ObserverDisposable(this, observer));
}

#endregion


public ObservableSinkStage()
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public override void PreStart()

public override void PostStop()
{
var exception = new AbruptStageTerminationException(this);
var exception = StreamDetachedException.Instance;
_completion.TrySetException(exception);
StopCallback(input =>
{
Expand Down
16 changes: 16 additions & 0 deletions src/core/Akka.Streams/StreamTcpException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ protected StreamTcpException(SerializationInfo info, StreamingContext context) :
#endif
}

/// <summary>
/// This exception signals that materialized value is already detached from stream. This usually happens
/// when stream is completed and an ActorSystem is shut down while materialized object is still available.
/// </summary>
public class StreamDetachedException : Exception
{
/// <summary>
/// Initializes a single instance of the <see cref="StreamDetachedException"/> class.
/// </summary>
public static readonly StreamDetachedException Instance = new StreamDetachedException();

private StreamDetachedException() : base("Stream is terminated. Materialized value is detached.")
{
}
}

/// <summary>
/// TBD
/// </summary>
Expand Down

0 comments on commit 331cd3a

Please sign in to comment.