Skip to content

Commit

Permalink
Fix OnHedging not being called (#1320)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Jun 19, 2023
1 parent ce97ea5 commit 6fcf0f6
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 13 deletions.
40 changes: 30 additions & 10 deletions src/Polly.Core/Hedging/HedgingResilienceStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
ResilienceContext context,
TState state)
{
var attempt = -1;
while (true)
{
attempt++;
var continueOnCapturedContext = context.ContinueOnCapturedContext;
var cancellationToken = context.CancellationToken;

Expand All @@ -79,7 +81,9 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
return new Outcome<TResult>(new OperationCanceledException(cancellationToken).TrySetStackTrace());
}

if ((await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext)).Outcome is Outcome<TResult> outcome)
var loadedExecution = await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext);

if (loadedExecution.Outcome is Outcome<TResult> outcome)
{
return outcome;
}
Expand All @@ -90,6 +94,10 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
{
// If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay.
// We will create additional hedged task in the next iteration.
await HandleOnHedgingAsync(
context,
new Outcome<TResult>(default(TResult)),
new OnHedgingArguments(attempt, HasOutcome: false)).ConfigureAwait(context.ContinueOnCapturedContext);
continue;
}

Expand All @@ -101,16 +109,28 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
return outcome;
}

var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(context, outcome, new OnHedgingArguments(context, hedgingContext.LoadedTasks - 1));
_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);
await HandleOnHedgingAsync(
context,
outcome,
new OnHedgingArguments(attempt, HasOutcome: true)).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(continueOnCapturedContext);
}
private async ValueTask HandleOnHedgingAsync<TResult>(ResilienceContext context, Outcome<TResult> outcome, OnHedgingArguments args)
{
var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(
context,
outcome,
args);

_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/Polly.Core/Hedging/HedgingStrategyOptions.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public class HedgingStrategyOptions<TResult> : ResilienceStrategyOptions
/// </summary>
/// <remarks>
/// Defaults to <see langword="null"/>.
/// <para>
/// The hedging is executed when the current attempt outcome is not successful and the <see cref="ShouldHandle"/> predicate returns <see langword="true"/> or when
/// the current attempt did not finish within the <see cref="HedgingDelay"/>.
/// </para>
/// </remarks>
public Func<OutcomeArguments<TResult, OnHedgingArguments>, ValueTask>? OnHedging { get; set; }
}
7 changes: 5 additions & 2 deletions src/Polly.Core/Hedging/OnHedgingArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ namespace Polly.Hedging;
/// <summary>
/// Represents arguments used by the on-hedging event.
/// </summary>
/// <param name="Context">The context associated with the execution of a user-provided callback.</param>
/// <param name="Attempt">The zero-based hedging attempt number.</param>
public record OnHedgingArguments(ResilienceContext Context, int Attempt);
/// <param name="HasOutcome">
/// Determines whether the outcome is available before loading the next hedged task.
/// No outcome indicates that the previous action did not finish within the hedging delay.
/// </param>
public record OnHedgingArguments(int Attempt, bool HasOutcome);
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void AddHedgingT_InvalidOptions_Throws()
[Fact]
public async Task AddHedging_IntegrationTest()
{
var hedgingWithoutOutcome = false;
ConcurrentQueue<string> results = new();

var strategy = _builder
Expand All @@ -78,7 +79,19 @@ public async Task AddHedging_IntegrationTest()
return "error".AsOutcome().AsOutcome();
};
},
OnHedging = args => { results.Enqueue(args.Result!.ToString()!); return default; }
OnHedging = args =>
{
if (args.Arguments.HasOutcome)
{
results.Enqueue(args.Result!.ToString()!);
}
else
{
hedgingWithoutOutcome = true;
}

return default;
}
})
.Build();

Expand All @@ -91,5 +104,6 @@ public async Task AddHedging_IntegrationTest()
result.Should().Be("success");
results.Should().HaveCountGreaterThan(0);
results.Distinct().Should().ContainSingle("error");
hedgingWithoutOutcome.Should().BeTrue();
}
}
30 changes: 30 additions & 0 deletions test/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ public async Task ExecuteAsync_EnsurePrimaryTaskCancelled_Ok()
await task;
}

[Fact]
public async Task ExecuteAsync_EnsureSecondaryHedgedTaskReportedWithNoOutcome()
{
// arrange
using var cancelled = new ManualResetEvent(false);
var hasOutcome = true;
_options.OnHedging = args =>
{
hasOutcome = args.Arguments.HasOutcome;
return default;
};

ConfigureHedging(context => Success.AsOutcomeAsync());

var strategy = Create();

// act
var task = strategy.ExecuteAsync(async token =>
{
await _timeProvider.Delay(TimeSpan.FromHours(24), token);
return Success;
});

// assert
_timeProvider.Advance(TimeSpan.FromHours(2));
hasOutcome.Should().BeFalse();
await task;
}

[Fact]
public async Task ExecuteAsync_EnsureDiscardedResultDisposed()
{
Expand Down Expand Up @@ -814,6 +843,7 @@ public async Task ExecuteAsync_EnsureOnHedgingCalled()
var attempts = new List<int>();
_options.OnHedging = args =>
{
args.Arguments.HasOutcome.Should().BeTrue();
args.Result.Should().Be(Failure);
attempts.Add(args.Arguments.Attempt);
return default;
Expand Down

0 comments on commit 6fcf0f6

Please sign in to comment.