Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay pipeline disposal when still in use #1579

Merged
merged 8 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ private Builder CreateBuilder()
builder.InstanceName = _instanceName;
_configure(builder, context);

var timeProvider = builder.TimeProvider;
var telemetry = new ResilienceStrategyTelemetry(
new ResilienceTelemetrySource(builder.Name, builder.InstanceName, null),
builder.TelemetryListener);

return new(
() => PipelineComponentFactory.WithDisposableCallbacks(
builder.BuildPipelineComponent(),
context.DisposeCallbacks),
() =>
{
var innerComponent = PipelineComponentFactory.WithDisposableCallbacks(builder.BuildPipelineComponent(), context.DisposeCallbacks);
return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider);
},
context.ReloadTokens,
telemetry);
}
Expand Down
56 changes: 56 additions & 0 deletions src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Polly.Utils.Pipeline;

internal sealed class ExecutionTrackingComponent : PipelineComponent
{
public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(30);

public static readonly TimeSpan SleepDelay = TimeSpan.FromSeconds(1);

private readonly TimeProvider _timeProvider;
private int _pendingExecutions;
martincostello marked this conversation as resolved.
Show resolved Hide resolved

public ExecutionTrackingComponent(PipelineComponent component, TimeProvider timeProvider)
{
Component = component;
_timeProvider = timeProvider;
}

public PipelineComponent Component { get; }

internal override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
Interlocked.Increment(ref _pendingExecutions);

try
{
return await Component.ExecuteCore(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
finally
{
Interlocked.Decrement(ref _pendingExecutions);
}
}

public override async ValueTask DisposeAsync()
{
var start = _timeProvider.GetTimestamp();
var stopwatch = Stopwatch.StartNew();

// We don't want to introduce locks or any synchronization primitives to main execution path
// so we will do "dummy" retries until there are no more executions.
while (Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) != 0)
martincostello marked this conversation as resolved.
Show resolved Hide resolved
{
await _timeProvider.Delay(SleepDelay).ConfigureAwait(false);

if (_timeProvider.GetElapsedTime(start) > Timeout)
{
break;
}
}

await Component.DisposeAsync().ConfigureAwait(false);
}
}
2 changes: 2 additions & 0 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static PipelineComponent WithDisposableCallbacks(PipelineComponent compon
return new ComponentWithDisposeCallbacks(component, callbacks.ToList());
}

public static PipelineComponent WithExecutionTracking(PipelineComponent component, TimeProvider timeProvider) => new ExecutionTrackingComponent(component, timeProvider);

public static PipelineComponent CreateComposite(
IReadOnlyList<PipelineComponent> components,
ResilienceStrategyTelemetry telemetry,
Expand Down
13 changes: 12 additions & 1 deletion src/Polly.Testing/ResiliencePipelineExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ private static object GetStrategyInstance<T>(PipelineComponent component)
return component;
}

private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks;
private static bool ShouldSkip(object instance) => instance switch
{
ReloadableComponent => true,
ComponentWithDisposeCallbacks => true,
ExecutionTrackingComponent => true,
_ => false
};

private static void ExpandComponents(this PipelineComponent component, List<PipelineComponent> components)
{
Expand All @@ -78,6 +84,11 @@ private static void ExpandComponents(this PipelineComponent component, List<Pipe
components.Add(reloadable);
ExpandComponents(reloadable.Component, components);
}
else if (component is ExecutionTrackingComponent tracking)
{
components.Add(tracking);
ExpandComponents(tracking.Component, components);
}
else if (component is ComponentWithDisposeCallbacks callbacks)
{
ExpandComponents(callbacks.Component, components);
Expand Down
15 changes: 15 additions & 0 deletions test/Polly.Core.Tests/Registry/ResiliencePipelineRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Polly.Testing;
using Polly.Timeout;
using Polly.Utils;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Registry;

Expand Down Expand Up @@ -380,6 +381,20 @@ public void GetOrAddPipeline_Ok()
called.Should().Be(1);
}

[Fact]
public void GetOrAddPipeline_EnsureCorrectComponents()
{
var id = new StrategyId(typeof(string), "A");

using var registry = CreateRegistry();

var pipeline = registry.GetOrAddPipeline(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1)));
pipeline.Component.Should().BeOfType<ExecutionTrackingComponent>().Subject.Component.Should().BeOfType<CompositeComponent>();

var genericPipeline = registry.GetOrAddPipeline<string>(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1)));
pipeline.Component.Should().BeOfType<ExecutionTrackingComponent>().Subject.Component.Should().BeOfType<CompositeComponent>();
}

[Fact]
public void GetOrAddPipeline_Generic_Ok()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Time.Testing;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class ExecutionTrackingComponentTests
martincostello marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly FakeTimeProvider _timeProvider = new();

[Fact]
public async Task DisposeAsync_PendingOperations_Delayed()
{
using var assert = new ManualResetEvent(false);
using var executing = new ManualResetEvent(false);

await using var inner = new Inner
{
OnExecute = () =>
{
executing.Set();
assert.WaitOne();
}
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
inner.Disposed.Should().BeFalse();
assert.Set();

_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
await execution;

_timeProvider.Advance(ExecutionTrackingComponent.SleepDelay);
await disposeTask;

inner.Disposed.Should().BeTrue();
}

[Fact]
public async Task DisposeAsync_Timeout_Ok()
{
using var assert = new ManualResetEvent(false);
using var executing = new ManualResetEvent(false);

await using var inner = new Inner
{
OnExecute = () =>
{
executing.Set();
assert.WaitOne();
}
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
inner.Disposed.Should().BeFalse();
_timeProvider.Advance(ExecutionTrackingComponent.Timeout - TimeSpan.FromSeconds(1));
inner.Disposed.Should().BeFalse();
_timeProvider.Advance(TimeSpan.FromSeconds(1));
_timeProvider.Advance(TimeSpan.FromSeconds(1));
await disposeTask;
inner.Disposed.Should().BeTrue();

assert.Set();
await execution;
}

private class Inner : PipelineComponent
{
public bool Disposed { get; private set; }

public override ValueTask DisposeAsync()
{
Disposed = true;
return default;
}

public Action OnExecute { get; set; } = () => { };

internal override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback, ResilienceContext context, TState state)
{
OnExecute();

return await callback(context, state);
}
}
}