From 93ac7e82f0bd1b37d07eef78fb77d5c10e92a19a Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Wed, 22 Mar 2023 07:46:07 +0100 Subject: [PATCH] Alternative implementation of `AtomicState` leveraging WaitAsync --- .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 605 +++++++----------- .../Pattern/CircuitBreakerStressSpec.cs | 134 ++++ src/core/Akka/Pattern/CircuitBreaker.cs | 69 +- .../Akka/Util/Extensions/TaskExtensions.cs | 51 ++ src/core/Akka/Util/Internal/AtomicState.cs | 127 +--- 5 files changed, 461 insertions(+), 525 deletions(-) create mode 100644 src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs create mode 100644 src/core/Akka/Util/Extensions/TaskExtensions.cs diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index aa5e1683b49..c400e59e454 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -7,174 +7,137 @@ using System; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; using Akka.Pattern; using Akka.TestKit; -using Akka.TestKit.Extensions; -using Akka.Tests.Util; +using Akka.Util.Internal; using Xunit; +using Xunit.Sdk; namespace Akka.Tests.Pattern { public class ASynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase { - [Fact(DisplayName = "A synchronous circuit breaker that is closed should allow call through")] - public void Should_Allow_Call_Through( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must allow calls through")] + public void Must_allow_calls_through() { - var breaker = LongCallTimeoutCb( ); - var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); - - Assert.Equal( "Test", result ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.WithSyncCircuitBreaker(SayHi).ShouldBe("hi"); } - [Fact( DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call fails" )] - public void Should_Increment_FailureCount_When_Call_Fails( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on failure")] + public void Must_increment_failure_count_on_failure() { - var breaker = LongCallTimeoutCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact( DisplayName = "A synchronous circuit breaker that is closed should reset failure count when call succeeds" )] - public void Should_Reset_FailureCount_When_Call_Succeeds( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")] + public void Must_increment_failure_count_on_fail_method() { - var breaker = MultiFailureCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.Equal(1, breaker.Instance.CurrentFailureCount); - - breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); - - Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + breaker.Instance.Fail(); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call times out")] - public void Should_Increment_FailureCount_When_Call_Times_Out( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] + public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() { - var breaker = ShortCallTimeoutCb( ); - - breaker.Instance.WithSyncCircuitBreaker( ( ) => Thread.Sleep( 500 ) ); - - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = ShortCallTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")] - public void Must_increment_failure_count_on_fail_method() + [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success")] + public void Must_reset_failure_count_after_success() { - var breaker = LongCallTimeoutCb(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - breaker.Instance.Fail(); - Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True(breaker.Instance.CurrentFailureCount == 1); + var breaker = MultiFailureCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); + breaker.Instance.WithSyncCircuitBreaker(SayHi); + breaker.Instance.CurrentFailureCount.ShouldBe(0); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count and clears cached last exception after success method")] + [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success method")] public void Must_reset_failure_count_after_success_method() { var breaker = MultiFailureCb(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); - Assert.True(breaker.Instance.CurrentFailureCount == 1); - Assert.True(breaker.Instance.LastCaughtException is TestException); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); breaker.Instance.Succeed(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - Assert.True(breaker.Instance.LastCaughtException is null); + breaker.Instance.CurrentFailureCount.ShouldBe(0); } - } - public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase - { - [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to close on success")] - public void Should_Pass_Call_And_Transition_To_Close_On_Success( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on callTimeout before call finishes")] + public void Must_increment_failure_count_on_callTimeout_before_call_finishes() { - var breaker = ShortResetTimeoutCb( ); - InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => SayTest( ) ); - - Assert.True( CheckLatch( breaker.ClosedLatch ) ); - Assert.Equal( SayTest( ), result ); + var breaker = ShortCallTimeoutCb(); + Task.Run(() => breaker.Instance.WithSyncCircuitBreaker(() => Thread.Sleep(Dilated(TimeSpan.FromSeconds(1))))); + Within(TimeSpan.FromMilliseconds(900), + () => AwaitCondition(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100)), TimeSpan.FromMilliseconds(100))); } + } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass only one call until it closes")] - public async Task Should_Pass_Only_One_Call_And_Transition_To_Close_On_Success() + public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "A synchronous circuit breaker that is half open must pass through next call and close on success")] + public void Must_pass_through_call_and_close_on_success() { var breaker = ShortResetTimeoutCb(); - InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); - - var task1 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1))); - var task2 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1))); - var combined = Task.WhenAny(task1, task2).Unwrap(); - - // One of the 2 tasks will throw, because the circuit breaker is half open - Exception caughtException = null; - try - { - await combined; - } - catch (Exception e) - { - caughtException = e; - } - Assert.True(caughtException is OpenCircuitException); - Assert.StartsWith("Circuit breaker is half open", caughtException.Message); - - // Wait until one of task completes - await Task.Delay(TimeSpan.FromSeconds(0.25)); + Assert.True("hi" == breaker.Instance.WithSyncCircuitBreaker(SayHi)); Assert.True(CheckLatch(breaker.ClosedLatch)); - - // We don't know which one of the task got faulted - string result = null; - if (task1.IsCompleted && !task1.IsFaulted) - result = task1.Result; - else if (task2.IsCompleted && !task2.IsFaulted) - result = task2.Result; - - Assert.Equal(SayTest(), result); } - - [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to open on exception")] - public void Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + [Fact(DisplayName = "A synchronous circuit breaker that is half open must open on exception in call")] + public void Must_open_on_exception_in_call() { - var breaker = ShortResetTimeoutCb( ); - - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "A synchronous circuit breaker that is half open must open on calling fail method")] + [Fact(DisplayName = "A synchronous circuit breaker that is half open on calling fail method")] public void Must_open_on_calling_fail_method() { - var breaker = ShortCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); breaker.Instance.Fail(); Assert.True(CheckLatch(breaker.OpenLatch)); } + [Fact(DisplayName = "A synchronous circuit breaker that is half open on calling success method")] + public void Must_open_on_calling_success_method() + { + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.Instance.Succeed(); + Assert.True(CheckLatch(breaker.ClosedLatch)); + } + [Fact(DisplayName = "A synchronous circuit breaker that is half open must close on calling success method")] public void Must_close_on_calling_success_method() { - var breaker = ShortCallTimeoutCb(); + var breaker = ShortResetTimeoutCb(); - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); breaker.Instance.Succeed(); Assert.True(CheckLatch(breaker.ClosedLatch)); @@ -183,380 +146,278 @@ public void Must_close_on_calling_success_method() public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "A synchronous circuit breaker that is open should throw exceptions before reset timeout")] - public void Should_Throw_Exceptions_Before_Reset_Timeout( ) + [Fact(DisplayName = "A synchronous circuit breaker that is open must throw exceptions when called before reset timeout")] + public void Must_throw_exceptions_before_reset_timeout() { - var breaker = LongResetTimeoutCb( ); + var breaker = LongResetTimeoutCb(); + + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + var e = InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + e.RemainingDuration.ShouldBeGreaterThan(TimeSpan.Zero); + e.RemainingDuration.ShouldBeLessOrEqualTo(LongResetTimeout); } - [Fact(DisplayName = "A synchronous circuit breaker that is open should transition to half open when reset times out")] - public void Should_Transition_To_Half_Open_When_Reset_Times_Out( ) + [Fact(DisplayName = "A synchronous circuit breaker that is open must transition to half-open on reset timeout")] + public void Must_transition_to_half_open_on_reset_timeout() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); } [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling success method")] public void Must_still_be_in_open_state_after_calling_success_method() { - var breaker = LongCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = LongResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); breaker.Instance.Succeed(); - Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.IsOpen.ShouldBeTrue(); } [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling fail method")] public void Must_still_be_in_open_state_after_calling_fail_method() { - var breaker = LongCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = LongResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); breaker.Instance.Fail(); - Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.IsOpen.ShouldBeTrue(); } } public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should allow call through")] - public async Task Should_Allow_Call_Through( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must allow calls through")] + public async Task Must_allow_calls_through() { - var breaker = LongCallTimeoutCb( ); - var result = await breaker.Instance.WithCircuitBreaker( () => Task.Run( ( ) => SayTest( ) ) ); - - Assert.Equal( SayTest( ), result ); + var breaker = LongCallTimeoutCb(); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + Assert.Equal("hi", result); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call fails")] - public async Task Should_Increment_Failure_Count_When_Call_Fails( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on exception")] + public async Task Must_increment_failure_count_on_exception() { - var breaker = LongCallTimeoutCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Run( ThrowException ) ).AwaitWithTimeout(AwaitTimeout) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should reset failure count when call succeeds after failure")] - public async Task Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on async failure")] + public void Must_increment_failure_count_on_async_failure() { - var breaker = MultiFailureCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); + var breaker = LongCallTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); + } - var whenall = Task.WhenAll( - breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException))); + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must reset failure count after success")] + public async Task Must_reset_failure_count_after_success() + { + var breaker = MultiFailureCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); + Enumerable.Range(1, 4).ForEach(_ => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); + await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(4), AwaitTimeout); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); + await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(0), AwaitTimeout); + } - Assert.True( await InterceptExceptionTypeAsync( async ( ) => - await whenall.AwaitWithTimeout(AwaitTimeout) ) ); + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on callTimeout")] + public async Task Must_increment_failure_count_on_callTimeout() + { + var breaker = ShortCallTimeoutCb(); - Assert.Equal(4, breaker.Instance.CurrentFailureCount); + var future = breaker.Instance.WithCircuitBreaker(async () => + { + await Task.Delay(150); + ThrowException(); + }); - var result = await breaker.Instance.WithCircuitBreaker(async () => await Task.Run( SayTest ) ); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); - Assert.Equal( SayTest( ), result ); - Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + // Since the timeout should have happened before the inner code finishes + // we expect a timeout, not TestException + await InterceptException(() => future.WaitAsync(AwaitTimeout)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call times out")] - public async Task Should_Increment_Failure_Count_When_Call_Times_Out( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] + public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() { - var breaker = ShortCallTimeoutCb( ); - - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Run( async () => - { - await Task.Delay(500); - return SayTest( ); - } )); - - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); - Assert.True(breaker.Instance.LastCaughtException is TimeoutException); + var breaker = ShortCallTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } } public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to close on success")] - public async Task Should_Pass_Call_And_Transition_To_Close_On_Success( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must pass through next call and close on success")] + public async Task Must_pass_through_next_call_and_close_on_success() { - var breaker = ShortResetTimeoutCb( ); - await InterceptExceptionTypeAsync( async ( ) => - await breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - var result = await breaker.Instance.WithCircuitBreaker( async - () => await Task.Factory.StartNew( SayTest ) ); - - Assert.True( CheckLatch( breaker.ClosedLatch ) ); - Assert.Equal( SayTest( ), result ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + Assert.Equal("hi", result); + Assert.True(CheckLatch(breaker.ClosedLatch)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on exception")] - public async Task Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on exception in call")] + public async Task Must_reopen_on_exception_in_call() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException )))); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException )))); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); + await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on async failure")] - public void Should_Pass_Call_And_Transition_To_Open_On_Async_Failure( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on async failure")] + public void Must_reopen_on_async_failure() { - var breaker = ShortResetTimeoutCb( ); - - breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); - breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + breaker.OpenLatch.Reset(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } } public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is open should throw exceptions when called before reset timeout")] - public async Task Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is open must throw exceptions when called before reset timeout")] + public async Task Must_throw_exceptions_when_called_before_reset_timeout() { - var breaker = LongResetTimeoutCb( ); + var breaker = LongResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + + Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True( await InterceptExceptionTypeAsync(async ( ) => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.True( await InterceptExceptionTypeAsync(async ( ) => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); + await InterceptException( + () => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is open should transition to half open when reset timeout")] - public async Task Should_Transition_To_Half_Open_When_Reset_Timeout( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")] + public void Must_transition_to_half_open_on_reset_timeout() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); } - - [Fact(DisplayName = "An asynchronous circuit breaker that is open should increase the reset timeout after it transits to open again")] - public async Task Should_Reset_Timeout_After_It_Transits_To_Open_Again() + + [Fact(DisplayName = "An asynchronous circuit breaker that is open must increase the reset timeout after it transits to open again")] + public async Task Must_increase_reset_timeout_after_it_transits_to_open_again() { var breaker = NonOneFactorCb(); - Assert.True(await InterceptExceptionTypeAsync(async () => - await breaker.Instance.WithCircuitBreaker(async () => - await Task.Run(ThrowException)))); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); - var e1 = await InterceptExceptionAsync(async () => - await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest()))); + var e1 = await InterceptException( + () => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); var shortRemainingDuration = e1.RemainingDuration; - await Task.Delay(1000); + await Task.Delay(Dilated(TimeSpan.FromMilliseconds(1000))); Assert.True(CheckLatch(breaker.HalfOpenLatch)); // transit to open again - Assert.True(await InterceptExceptionTypeAsync(async () => - await breaker.Instance.WithCircuitBreaker(async () => - await Task.Run(ThrowException)))); + breaker.OpenLatch.Reset(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); - var e2 = await InterceptExceptionAsync(async () => - await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest()))); + var e2 = await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayHi()))); var longRemainingDuration = e2.RemainingDuration; - Assert.True(shortRemainingDuration < longRemainingDuration); + shortRemainingDuration.ShouldBeLessThan(longRemainingDuration); } } public class CircuitBreakerSpecBase : AkkaSpec { - public TimeSpan AwaitTimeout { get; } = TimeSpan.FromSeconds(2); - - public bool CheckLatch( CountdownEvent latch ) - { - return latch.Wait( AwaitTimeout ); - } + public TimeSpan AwaitTimeout => Dilated(TimeSpan.FromSeconds(2)); - public Task Delay( TimeSpan toDelay, CancellationToken? token ) - { - return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay ); - } - - public async Task DelayedSayTest(TimeSpan delay) - { - await Task.Delay(delay); - return "Test"; - } + public bool CheckLatch(CountdownEvent latch) => latch.Wait(AwaitTimeout); [DebuggerStepThrough] - public void ThrowException() => throw new TestException("Test Exception"); + public static void ThrowException() => throw new TestException("Test Exception"); - public string SayTest( ) => "Test"; - - protected T InterceptException(Action actionThatThrows) where T : Exception - { - return Assert.Throws(() => - { - try - { - actionThatThrows(); - } - catch (AggregateException ex) - { - foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e)) - throw e; - } - }); - } + public static string SayHi() => "hi"; - protected async Task InterceptExceptionAsync(Func actionThatThrows) where T : Exception - { - return await Assert.ThrowsAsync(async () => - { - try - { - await actionThatThrows(); - } - catch (AggregateException ex) - { - foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e)) - throw e; - } - }); - } - - [SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )] - public bool InterceptExceptionType( Action action ) where T : Exception - { - try - { - action.Invoke( ); - return false; - } - catch ( Exception ex ) - { - if (ex is AggregateException aggregate) - { - // ReSharper disable once UnusedVariable - foreach (var temp in aggregate - .InnerExceptions - .Where(t => !(t is T))) - { - throw; - } - } else if (!(ex is T)) - { - throw; - } - } - return true; - } + protected T InterceptException(Action actionThatThrows) where T : Exception => + Intercept(actionThatThrows); - public async Task InterceptExceptionTypeAsync(Func action) where T : Exception + protected static async Task InterceptException(Func actionThatThrows) + where T : Exception { try { - await action(); - return false; + await actionThatThrows(); } catch (Exception ex) { - if (ex is AggregateException aggregate) - { - // ReSharper disable once UnusedVariable - foreach (var temp in aggregate - .InnerExceptions - .Where(t => !(t is T))) - { - throw; - } - } - else if (!(ex is T)) - { - throw; - } + var exception = ex is AggregateException aggregateException + ? aggregateException.Flatten().InnerExceptions[0] + : ex; + + var exceptionType = typeof(T); + return exceptionType == exception.GetType() + ? (T)exception + : throw new ThrowsException(exceptionType, exception); } - return true; - } - public TestBreaker ShortCallTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) ); + throw new ThrowsException(typeof(T)); } - public TestBreaker ShortResetTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) ); - } + public TestBreaker ShortCallTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(50)), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker LongCallTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) ); - } + public TestBreaker ShortResetTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromMilliseconds(50)))); - public TestBreaker LongResetTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) ); - } + public TestBreaker LongCallTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromSeconds(5), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker MultiFailureCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) ); - } - - public TestBreaker NonOneFactorCb() - { - return new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(1000), TimeSpan.FromDays(1), 5, 0)); - } - } + public TimeSpan LongResetTimeout = TimeSpan.FromSeconds(5); + public TestBreaker LongResetTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(100)), Dilated(LongResetTimeout))); + + public TestBreaker MultiFailureCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 5, Dilated(TimeSpan.FromMilliseconds(200)), Dilated(TimeSpan.FromMilliseconds(500)))); + public TestBreaker NonOneFactorCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(2000)), Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromDays(1)), 5, 0)); + } internal class TestException : Exception { - public TestException( ) + public TestException() { } - public TestException( string message ) - : base( message ) + public TestException(string message) + : base(message) { } - public TestException( string message, Exception innerException ) - : base( message, innerException ) + public TestException(string message, Exception innerException) + : base(message, innerException) { } - protected TestException( SerializationInfo info, StreamingContext context ) - : base( info, context ) + protected TestException(SerializationInfo info, StreamingContext context) + : base(info, context) { } } - } diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs new file mode 100644 index 00000000000..e3a84dad7ac --- /dev/null +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs @@ -0,0 +1,134 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Pattern; +using Akka.TestKit; +using Akka.Util; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Pattern +{ + public class CircuitBreakerStressSpec : AkkaSpec + { + internal class RequestJob + { + public static RequestJob Instance => new RequestJob(); + private RequestJob() { } + } + + internal class JobDone + { + public static JobDone Instance => new JobDone(); + private JobDone() { } + } + + internal class GetResult + { + public static GetResult Instance => new GetResult(); + private GetResult() { } + } + + internal class Result + { + public int DoneCount { get; } + public int TimeoutCount { get; } + public int FailCount { get; } + public int CircCount { get; } + + public Result(int doneCount, int timeoutCount, int failCount, int circCount) + { + DoneCount = doneCount; + TimeoutCount = timeoutCount; + FailCount = failCount; + CircCount = circCount; + } + } + + internal class StressActor : UntypedActor + { + private readonly CircuitBreaker _breaker; + private int _doneCount; + private int _timeoutCount; + private int _failCount; + private int _circCount; + + public StressActor(CircuitBreaker breaker) => _breaker = breaker; + + protected override void OnReceive(object message) + { + switch (message) + { + case RequestJob _: + _breaker.WithCircuitBreaker(Job).PipeTo(Self); + break; + case JobDone _: + _doneCount++; + break; + case Status.Failure { Cause: OpenCircuitException _ }: + _circCount++; + _breaker.WithCircuitBreaker(Job).PipeTo(Self); + break; + case Status.Failure { Cause: TimeoutException _ }: + _timeoutCount++; + _breaker.WithCircuitBreaker(Job).PipeTo(Self); + break; + case Status.Failure _: + _failCount++; + _breaker.WithCircuitBreaker(Job).PipeTo(Self); + break; + case GetResult _: + Sender.Tell(new Result(_doneCount, _timeoutCount, _failCount, _circCount)); + break; + default: + base.Unhandled(message); + break; + } + } + + private static async Task Job() + { + await Task.Delay(TimeSpan.FromMilliseconds(ThreadLocalRandom.Current.Next(300))); + return JobDone.Instance; + } + } + + public CircuitBreakerStressSpec(ITestOutputHelper output) + : base(output) + { } + + [Fact] + public async Task A_CircuitBreaker_stress_test() + { + var breaker = new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(200)); + var stressActors = Enumerable.Range(0, 3).Select(i => Sys.ActorOf(Props.Create(breaker))).ToList(); + + for (var i = 0; i < 1000; i++) + foreach (var stressActor in stressActors) + { + stressActor.Tell(RequestJob.Instance); + } + + // let them work for a while + await Task.Delay(3000); + + foreach (var stressActor in stressActors) + { + stressActor.Tell(GetResult.Instance); + var result = ExpectMsg(); + result.FailCount.ShouldBe(0); + + Output.WriteLine("FailCount:{0}, DoneCount:{1}, CircCount:{2}, TimeoutCount:{3}", + result.FailCount, result.DoneCount, result.CircCount, result.TimeoutCount); + } + } + } +} diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 93808088010..cf0a0f00dd7 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -214,10 +214,7 @@ public CircuitBreaker(IScheduler scheduler, int maxFailures, TimeSpan callTimeou /// /// Retrieves current failure count. /// - public long CurrentFailureCount - { - get { return Closed.Current; } - } + public long CurrentFailureCount => Closed.Current; public Exception LastCaughtException { get; private set; } @@ -227,67 +224,35 @@ public long CurrentFailureCount /// TBD /// Call needing protected /// containing the call result - public Task WithCircuitBreaker(Func> body) - { - return CurrentState.Invoke(body); - } + public Task WithCircuitBreaker(Func> body) => CurrentState.Invoke(body); - public Task WithCircuitBreaker(TState state, - Func> body) - { - return CurrentState.InvokeState(state, body); - } + public Task WithCircuitBreaker(TState state, Func> body) => + CurrentState.InvokeState(state, body); /// /// Wraps invocation of asynchronous calls that need to be protected /// /// Call needing protected /// - public Task WithCircuitBreaker(Func body) - { - return CurrentState.Invoke(body); - } - public Task WithCircuitBreaker(TState state, Func body) - { - return CurrentState.InvokeState(state, body); - } + public Task WithCircuitBreaker(Func body) => CurrentState.Invoke(body); + + public Task WithCircuitBreaker(TState state, Func body) => + CurrentState.InvokeState(state, body); /// - /// The failure will be recorded farther down. + /// Wraps invocations of asynchronous calls that need to be protected. /// - /// TBD - public void WithSyncCircuitBreaker(Action body) - { - var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b)); - if (!cbTask.Wait(CallTimeout)) - { - //throw new TimeoutException( string.Format( "Execution did not complete within the time allotted {0} ms", CallTimeout.TotalMilliseconds ) ); - } - if (cbTask.Exception != null) - { - ExceptionDispatchInfo.Capture(cbTask.Exception).Throw(); - } - } + /// Call needing protected + public void WithSyncCircuitBreaker(Action body) => + WithCircuitBreaker(body, b => Task.Run(b)).GetAwaiter().GetResult(); /// - /// Wraps invocations of asynchronous calls that need to be protected - /// If this does not complete within the time allotted, it should return default() - /// - /// - /// Await.result( - /// withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }), - /// callTimeout) - /// - /// + /// Wraps invocations of asynchronous calls that need to be protected. /// - /// TBD - /// TBD - /// or default() - public T WithSyncCircuitBreaker(Func body) - { - var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b)); - return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T); - } + /// Call needing protected + /// The result of the call + public T WithSyncCircuitBreaker(Func body) => + WithCircuitBreaker(body, b => Task.Run(b)).Result; /// /// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the diff --git a/src/core/Akka/Util/Extensions/TaskExtensions.cs b/src/core/Akka/Util/Extensions/TaskExtensions.cs new file mode 100644 index 00000000000..f134e76f87a --- /dev/null +++ b/src/core/Akka/Util/Extensions/TaskExtensions.cs @@ -0,0 +1,51 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +namespace System.Threading.Tasks +{ + internal static class TaskExtensions + { +#if NETSTANDARD + public static async Task WaitAsync(this Task task, TimeSpan timeout) + { + var cts = new CancellationTokenSource(); + try + { + var delayTask = Task.Delay(timeout, cts.Token); + var completedTask = await Task.WhenAny(task, delayTask); + if (completedTask == delayTask) + throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms"); + + await task; + } + finally + { + cts.Cancel(); + cts.Dispose(); + } + } + + public static async Task WaitAsync(this Task task, TimeSpan timeout) + { + var cts = new CancellationTokenSource(); + try + { + var delayTask = Task.Delay(timeout, cts.Token); + var completedTask = await Task.WhenAny(task, delayTask); + return completedTask == delayTask + ? throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms") + : await task; + } + finally + { + cts.Cancel(); + cts.Dispose(); + } + } +#endif + } +} diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index 06d9a6bc50f..ebee0273241 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -73,149 +73,78 @@ await Task /// /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed /// call timeout is counted as a failed call, otherwise a successful call - /// - /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this - /// as a failure. - /// - /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx /// - /// TBD - /// Implementation of the call - /// result of the call + /// Implementation of the call + /// containing the result of the call public async Task CallThrough(Func> task) { - var deadline = DateTime.UtcNow.Add(_callTimeout); - ExceptionDispatchInfo capturedException = null; - T result = default(T); + var result = default(T); try { - result = await task().ConfigureAwait(false); + result = await task().WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { - capturedException = ExceptionDispatchInfo.Capture(ex); - } - - // Need to make sure that timeouts are reported as timeouts - if (capturedException != null) - { + var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw(); } - else if (DateTime.UtcNow.CompareTo(deadline) >= 0) - { - CallFails(new TimeoutException( - $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); - } - else - { - CallSucceeds(); - } + return result; } - - public async Task CallThrough(TState state, Func> task) + + public async Task CallThrough(TState state, Func> task) { - var deadline = DateTime.UtcNow.Add(_callTimeout); - ExceptionDispatchInfo capturedException = null; - T result = default(T); + var result = default(T); try { - result = await task(state).ConfigureAwait(false); + result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { - capturedException = ExceptionDispatchInfo.Capture(ex); - } - - // Need to make sure that timeouts are reported as timeouts - if (capturedException != null) - { + var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw(); } - else if (DateTime.UtcNow.CompareTo(deadline) >= 0) - { - CallFails(new TimeoutException( - $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); - } - else - { - CallSucceeds(); - } + return result; } /// /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed /// call timeout is counted as a failed call, otherwise a successful call - /// - /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this - /// as a failure. - /// - /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx /// /// Implementation of the call - /// + /// containing the result of the call public async Task CallThrough(Func task) { - var deadline = DateTime.UtcNow.Add(_callTimeout); - ExceptionDispatchInfo capturedException = null; - try { - await task().ConfigureAwait(false); + await task().WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { - capturedException = ExceptionDispatchInfo.Capture(ex); - } - - // Need to make sure that timeouts are reported as timeouts - if (capturedException != null) - { - CallFails(capturedException?.SourceException); + var capturedException = ExceptionDispatchInfo.Capture(ex); + CallFails(capturedException.SourceException); capturedException.Throw(); - } - else if (DateTime.UtcNow.CompareTo(deadline) >= 0) - { - CallFails(new TimeoutException( - $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); - } - else - { - CallSucceeds(); } } - + public async Task CallThrough(TState state, Func task) { - var deadline = DateTime.UtcNow.Add(_callTimeout); - ExceptionDispatchInfo capturedException = null; - try { - await task(state).ConfigureAwait(false); + await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { - capturedException = ExceptionDispatchInfo.Capture(ex); - } - - // Need to make sure that timeouts are reported as timeouts - if (capturedException != null) - { - CallFails(capturedException?.SourceException); + var capturedException = ExceptionDispatchInfo.Capture(ex); + CallFails(capturedException.SourceException); capturedException.Throw(); - } - else if (DateTime.UtcNow.CompareTo(deadline) >= 0) - { - CallFails(new TimeoutException( - $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); - } - else - { - CallSucceeds(); } } @@ -227,8 +156,7 @@ public async Task CallThrough(TState state, Func task) /// containing result of protected call public abstract Task Invoke(Func> body); - public abstract Task InvokeState(TState state, - Func> body); + public abstract Task InvokeState(TState state, Func> body); /// /// Abstract entry point for all states @@ -237,9 +165,7 @@ public abstract Task InvokeState(TState state, /// containing result of protected call public abstract Task Invoke(Func body); - public abstract Task InvokeState(TState state, - Func body); - + public abstract Task InvokeState(TState state, Func body); /// /// Invoked when call fails @@ -267,7 +193,6 @@ public void Enter() NotifyTransitionListeners(); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } - } ///