Skip to content

Commit

Permalink
[WIP] Chasing test suite issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Oct 13, 2022
1 parent fee2e6d commit 8b69365
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
75 changes: 62 additions & 13 deletions src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, CancellationToken ca
AwaitConditionAsync(async () => conditionIsFulfilled(), cancellationToken)
.WaitAndUnwrapException();
}

public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, CancellationToken cancellationToken = default)
{
var maxDur = RemainingOrDefault;
Expand Down Expand Up @@ -71,15 +71,15 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, Cance
AwaitConditionAsync(async () => conditionIsFulfilled(), max, cancellationToken)
.WaitAndUnwrapException(cancellationToken);
}

public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, CancellationToken cancellationToken = default)
{
var maxDur = RemainingOrDilated(max);
var interval = new TimeSpan(maxDur.Ticks / 10);
var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null;
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval, (format, args) => _assertions.Fail(format, args), logger, cancellationToken);
}

/// <summary>
/// <para>Await until the given condition evaluates to <c>true</c> or the timeout
/// expires, whichever comes first.</para>
Expand All @@ -105,7 +105,7 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, strin
AwaitConditionAsync(async () => conditionIsFulfilled(), max, message, cancellationToken)
.WaitAndUnwrapException();
}

public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, string message, CancellationToken cancellationToken = default)
{
var maxDur = RemainingOrDilated(max);
Expand Down Expand Up @@ -143,16 +143,16 @@ public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, Tim
/// <param name="message">The message used if the timeout expires.</param>
/// <param name="cancellationToken"></param>
public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default)
{
{
AwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, message, cancellationToken)
.WaitAndUnwrapException(cancellationToken);
}

public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default)
{
var maxDur = RemainingOrDilated(max);
var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null;
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval,
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval,
(format, args) => AssertionsFail(format, args, message), logger, cancellationToken);
}

Expand All @@ -179,7 +179,7 @@ public bool AwaitConditionNoThrow(Func<bool> conditionIsFulfilled, TimeSpan max,
return AwaitConditionNoThrowAsync(async () => conditionIsFulfilled(), max, interval, cancellationToken)
.WaitAndUnwrapException(cancellationToken);
}

public Task<bool> AwaitConditionNoThrowAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval = null, CancellationToken cancellationToken = default)
{
var intervalDur = interval.GetValueOrDefault(TimeSpan.FromMilliseconds(100));
Expand Down Expand Up @@ -218,7 +218,7 @@ protected static bool InternalAwaitCondition(Func<bool> conditionIsFulfilled, Ti
{
return InternalAwaitCondition(conditionIsFulfilled, max, interval, fail, null, cancellationToken);
}

protected static Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action<string, object[]> fail
, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -258,7 +258,7 @@ protected static bool InternalAwaitCondition(Func<bool> conditionIsFulfilled, Ti
{
return InternalAwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, fail, logger, cancellationToken)
.WaitAndUnwrapException(cancellationToken);

}

protected static async Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action<string, object[]> fail, ILoggingAdapter logger, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -293,10 +293,59 @@ protected static async Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> c
return true;
}

private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args)
protected void AwaitCond(Func<bool> p, TimeSpan? max = null, TimeSpan? interval = null, string message = "")
{
if (logger != null)
logger.Debug(format, args);
if (interval == null) interval = TimeSpan.FromMilliseconds(100);

var dilatedMax = RemainingOrDilated(max);
var stop = Now + dilatedMax;

void Poll(TimeSpan t)
{
if (!p())
{
_assertions.AssertTrue(Now < stop, $"timeout {dilatedMax} expired: {message}");
Thread.Sleep(t);
Poll((stop - Now).Min(interval.Value));
}
}

Poll(dilatedMax.Min(interval.Value));
}

protected void Within(TimeSpan max, Action f) =>
Within(TimeSpan.Zero, max, f);

protected void Within(TimeSpan min, TimeSpan max, Action f)
{
var dilatedMax = Dilated(max);
var start = Now;
var rem = _testState.End.HasValue ? _testState.End.Value - start : Timeout.InfiniteTimeSpan;
_assertions.AssertTrue(rem.IsInfiniteTimeout() || rem >= min, "Required min time {0} not possible, only {1} left.", min, rem);

_testState.LastWasNoMsg = false;

var maxDiff = dilatedMax.Min(rem);
var prevEnd = _testState.End;
_testState.End = start + maxDiff;

try
{
f();
}
finally
{
_testState.End = prevEnd;
}

var diff = Now - start;
_assertions.AssertTrue(min <= diff, $"block took {diff}, should at least have been {min}");
if (!_testState.LastWasNoMsg)
{
_assertions.AssertTrue(diff <= maxDiff, $"block took {diff}, exceeding {maxDiff}");
}
}

private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args) => logger?.Info(format, args);
}
}
26 changes: 13 additions & 13 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void Must_increment_failure_count_on_callTimeout_before_call_finishes()
var breaker = ShortCallTimeoutCb();
Task.Run(() => breaker.Instance.WithSyncCircuitBreaker(() => Thread.Sleep(Dilated(TimeSpan.FromSeconds(1)))));
Within(TimeSpan.FromMilliseconds(900),
() => AwaitCondition(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100))));
() => AwaitCond(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100))));
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
public async Task Must_allow_calls_through()
{
var breaker = LongCallTimeoutCb();
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout);
Assert.Equal("hi", result);
}

Expand All @@ -203,7 +203,7 @@ public async Task Must_increment_failure_count_on_exception()
{
var breaker = LongCallTimeoutCb();
await InterceptException<TestException>(() =>
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout));
Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.CurrentFailureCount.ShouldBe(1);
}
Expand Down Expand Up @@ -244,7 +244,7 @@ public async Task Must_increment_failure_count_on_callTimeout()

// Since the timeout should have happened before the inner code finishes
// we expect a timeout, not TestException
await InterceptException<TimeoutException>(() => future.WaitAsync(AwaitTimeout));
await InterceptException<TimeoutException>(() => future.WaitAsync1(AwaitTimeout));
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")]
Expand All @@ -264,7 +264,7 @@ public async Task Must_pass_through_next_call_and_close_on_success()
var breaker = ShortResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout);
Assert.Equal("hi", result);
Assert.True(CheckLatch(breaker.ClosedLatch));
}
Expand All @@ -277,7 +277,7 @@ public async Task Must_reopen_on_exception_in_call()
Assert.True(CheckLatch(breaker.HalfOpenLatch));
breaker.OpenLatch.Reset();
await InterceptException<TestException>(() =>
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout));
Assert.True(CheckLatch(breaker.OpenLatch));
}

Expand Down Expand Up @@ -305,7 +305,7 @@ public async Task Must_throw_exceptions_when_called_before_reset_timeout()
Assert.True(CheckLatch(breaker.OpenLatch));

await InterceptException<OpenCircuitException>(
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout));
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout));
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")]
Expand All @@ -323,7 +323,7 @@ public async Task Must_increase_reset_timeout_after_it_transits_to_open_again()
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));

var e1 = await InterceptException<OpenCircuitException>(
var e1 = await InterceptException<OpenCircuitException>(
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)));
var shortRemainingDuration = e1.RemainingDuration;

Expand Down Expand Up @@ -379,23 +379,23 @@ protected static async Task<T> InterceptException<T>(Func<Task> actionThatThrows
throw new ThrowsException(typeof(T));
}

public TestBreaker ShortCallTimeoutCb() =>
public TestBreaker ShortCallTimeoutCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(50)), Dilated(TimeSpan.FromMilliseconds(500))));

public TestBreaker ShortResetTimeoutCb() =>
public TestBreaker ShortResetTimeoutCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromMilliseconds(50))));

public TestBreaker LongCallTimeoutCb() =>
public TestBreaker LongCallTimeoutCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromSeconds(5), Dilated(TimeSpan.FromMilliseconds(500))));

public TimeSpan LongResetTimeout = TimeSpan.FromSeconds(5);
public TestBreaker LongResetTimeoutCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(100)), Dilated(LongResetTimeout)));

public TestBreaker MultiFailureCb() =>
public TestBreaker MultiFailureCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 5, Dilated(TimeSpan.FromMilliseconds(200)), Dilated(TimeSpan.FromMilliseconds(500))));

public TestBreaker NonOneFactorCb() =>
public TestBreaker NonOneFactorCb() =>
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(2000)), Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromDays(1)), 5, 0));
}

Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka/Util/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ namespace System.Threading.Tasks
{
internal static class TaskExtensions
{
#if NETSTANDARD
public static async Task WaitAsync(this Task task, TimeSpan timeout)
//#if NETSTANDARD
public static async Task WaitAsync1(this Task task, TimeSpan timeout)
{
var cts = new CancellationTokenSource();
try
Expand All @@ -29,7 +29,7 @@ public static async Task WaitAsync(this Task task, TimeSpan timeout)
}
}

public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, TimeSpan timeout)
public static async Task<TResult> WaitAsync1<TResult>(this Task<TResult> task, TimeSpan timeout)
{
var cts = new CancellationTokenSource();
try
Expand All @@ -46,6 +46,6 @@ public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, Ti
cts.Dispose();
}
}
#endif
//#endif
}
}
8 changes: 4 additions & 4 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
var result = default(T);
try
{
result = await task().WaitAsync(_callTimeout).ConfigureAwait(false);
result = await task().WaitAsync1(_callTimeout).ConfigureAwait(false);
CallSucceeds();
}
catch (Exception ex)
Expand All @@ -99,7 +99,7 @@ public async Task<T> CallThrough<T, TState>(TState state, Func<TState, Task<T>>
var result = default(T);
try
{
result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
result = await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false);
CallSucceeds();
}
catch (Exception ex)
Expand All @@ -122,7 +122,7 @@ public async Task CallThrough(Func<Task> task)
{
try
{
await task().WaitAsync(_callTimeout).ConfigureAwait(false);
await task().WaitAsync1(_callTimeout).ConfigureAwait(false);
CallSucceeds();
}
catch (Exception ex)
Expand All @@ -137,7 +137,7 @@ public async Task CallThrough<TState>(TState state, Func<TState, Task> task)
{
try
{
await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false);
CallSucceeds();
}
catch (Exception ex)
Expand Down

0 comments on commit 8b69365

Please sign in to comment.