From d1fa3b9e83a48b874f119d9640b099e56ce774db Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 14:39:20 +0200 Subject: [PATCH 1/8] Delay pipeline disposal when still in use --- .../RegistryPipelineComponentBuilder.cs | 9 +- .../Pipeline/ExecutionTrackingComponent.cs | 56 +++++++++++ .../Pipeline/PipelineComponentFactory.cs | 2 + .../ResiliencePipelineExtensions.cs | 13 ++- .../ResiliencePipelineRegistryTests.cs | 15 +++ .../ExecutionTrackingComponentTests.cs | 96 +++++++++++++++++++ 6 files changed, 187 insertions(+), 4 deletions(-) create mode 100644 src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs create mode 100644 test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs diff --git a/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs b/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs index e29c89560da..4493631f8cd 100644 --- a/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs +++ b/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs @@ -57,14 +57,17 @@ private Builder CreateBuilder() builder.InstanceName = _instanceName; _configure(builder, context); + var timeProvider = builder.TimeProvider; var telemetry = new ResilienceStrategyTelemetry( new ResilienceTelemetrySource(builder.Name, builder.InstanceName, null), builder.TelemetryListener); return new( - () => PipelineComponentFactory.WithDisposableCallbacks( - builder.BuildPipelineComponent(), - context.DisposeCallbacks), + () => + { + var innerComponent = PipelineComponentFactory.WithDisposableCallbacks(builder.BuildPipelineComponent(), context.DisposeCallbacks); + return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider); + }, context.ReloadTokens, telemetry); } diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs new file mode 100644 index 00000000000..e77a0e1ff5d --- /dev/null +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -0,0 +1,56 @@ +namespace Polly.Utils.Pipeline; + +internal sealed class ExecutionTrackingComponent : PipelineComponent +{ + public static readonly TimeSpan Timeout = TimeSpan.FromMinutes(5); + + public static readonly TimeSpan SleepDelay = TimeSpan.FromSeconds(1); + + private readonly TimeProvider _timeProvider; + private int _pendingExecutions; + + public ExecutionTrackingComponent(PipelineComponent component, TimeProvider timeProvider) + { + Component = component; + _timeProvider = timeProvider; + } + + public PipelineComponent Component { get; } + + internal override async ValueTask> ExecuteCore( + Func>> callback, + ResilienceContext context, + TState state) + { + Interlocked.Increment(ref _pendingExecutions); + + try + { + return await Component.ExecuteCore(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext); + } + finally + { + Interlocked.Decrement(ref _pendingExecutions); + } + } + + public override async ValueTask DisposeAsync() + { + var start = _timeProvider.GetTimestamp(); + var stopwatch = Stopwatch.StartNew(); + + // We don't want to introduce locks or any synchronization primitives to main execution path + // so we will do "dummy" retries until there are no more executions. + while (Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) != 0) + { + await _timeProvider.Delay(SleepDelay).ConfigureAwait(false); + + if (_timeProvider.GetElapsedTime(start) > Timeout) + { + break; + } + } + + await Component.DisposeAsync().ConfigureAwait(false); + } +} diff --git a/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs b/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs index 125f742a103..5c992295fc8 100644 --- a/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs +++ b/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs @@ -24,6 +24,8 @@ public static PipelineComponent WithDisposableCallbacks(PipelineComponent compon return new ComponentWithDisposeCallbacks(component, callbacks.ToList()); } + public static PipelineComponent WithExecutionTracking(PipelineComponent component, TimeProvider timeProvider) => new ExecutionTrackingComponent(component, timeProvider); + public static PipelineComponent CreateComposite( IReadOnlyList components, ResilienceStrategyTelemetry telemetry, diff --git a/src/Polly.Testing/ResiliencePipelineExtensions.cs b/src/Polly.Testing/ResiliencePipelineExtensions.cs index 19692182d30..f2487f00c51 100644 --- a/src/Polly.Testing/ResiliencePipelineExtensions.cs +++ b/src/Polly.Testing/ResiliencePipelineExtensions.cs @@ -62,7 +62,13 @@ private static object GetStrategyInstance(PipelineComponent component) return component; } - private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks; + private static bool ShouldSkip(object instance) => instance switch + { + ReloadableComponent => true, + ComponentWithDisposeCallbacks => true, + ExecutionTrackingComponent => true, + _ => false + }; private static void ExpandComponents(this PipelineComponent component, List components) { @@ -78,6 +84,11 @@ private static void ExpandComponents(this PipelineComponent component, List builder.AddTimeout(TimeSpan.FromSeconds(1))); + pipeline.Component.Should().BeOfType().Subject.Component.Should().BeOfType(); + + var genericPipeline = registry.GetOrAddPipeline(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1))); + pipeline.Component.Should().BeOfType().Subject.Component.Should().BeOfType(); + } + [Fact] public void GetOrAddPipeline_Generic_Ok() { diff --git a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs new file mode 100644 index 00000000000..810e647f307 --- /dev/null +++ b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs @@ -0,0 +1,96 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Time.Testing; +using Polly.Utils.Pipeline; + +namespace Polly.Core.Tests.Utils.Pipeline; + +public class ExecutionTrackingComponentTests +{ + private readonly FakeTimeProvider _timeProvider = new(); + + [Fact] + public async Task DisposeAsync_PendingOperations_Delayed() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + var disposeTask = component.DisposeAsync().AsTask(); + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + inner.Disposed.Should().BeFalse(); + assert.Set(); + + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + await execution; + + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + await disposeTask; + + inner.Disposed.Should().BeTrue(); + } + + [Fact] + public async Task DisposeAsync_Timeout_Ok() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + var disposeTask = component.DisposeAsync().AsTask(); + inner.Disposed.Should().BeFalse(); + _timeProvider.Advance(ExecutionTrackingComponent.Timeout - TimeSpan.FromSeconds(1)); + inner.Disposed.Should().BeFalse(); + _timeProvider.Advance(TimeSpan.FromSeconds(1)); + _timeProvider.Advance(TimeSpan.FromSeconds(1)); + await disposeTask; + inner.Disposed.Should().BeTrue(); + + assert.Set(); + await execution; + } + + private class Inner : PipelineComponent + { + public bool Disposed { get; private set; } + + public override ValueTask DisposeAsync() + { + Disposed = true; + return default; + } + + public Action OnExecute { get; set; } = () => { }; + + internal override async ValueTask> ExecuteCore(Func>> callback, ResilienceContext context, TState state) + { + OnExecute(); + + return await callback(context, state); + } + } +} From b5aa3c7d8424af618547d9a9f39fde0f4f153268 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 14:41:56 +0200 Subject: [PATCH 2/8] cleanup --- src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs index e77a0e1ff5d..6ffedcafa29 100644 --- a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -2,7 +2,7 @@ internal sealed class ExecutionTrackingComponent : PipelineComponent { - public static readonly TimeSpan Timeout = TimeSpan.FromMinutes(5); + public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(30); public static readonly TimeSpan SleepDelay = TimeSpan.FromSeconds(1); From 121daaa748fba14378dd47c6722675a956f0ae2f Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 14:56:49 +0200 Subject: [PATCH 3/8] Fix code coverage --- .../ResiliencePipelineExtensions.cs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/src/Polly.Testing/ResiliencePipelineExtensions.cs b/src/Polly.Testing/ResiliencePipelineExtensions.cs index f2487f00c51..feea4ad59d3 100644 --- a/src/Polly.Testing/ResiliencePipelineExtensions.cs +++ b/src/Polly.Testing/ResiliencePipelineExtensions.cs @@ -40,10 +40,10 @@ private static ResiliencePipelineDescriptor GetPipelineDescriptorCore(Pipelin var components = new List(); component.ExpandComponents(components); - var descriptors = components.Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance(s))).ToList(); + var descriptors = components.OfType().Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance(s))).ToList().AsReadOnly(); return new ResiliencePipelineDescriptor( - descriptors.Where(s => !ShouldSkip(s.StrategyInstance)).ToList().AsReadOnly(), + descriptors, isReloadable: components.Exists(s => s is ReloadableComponent)); } @@ -54,22 +54,9 @@ private static object GetStrategyInstance(PipelineComponent component) return reactiveBridge.Strategy; } - if (component is BridgeComponent nonReactiveBridge) - { - return nonReactiveBridge.Strategy; - } - - return component; + return ((BridgeComponent)component).Strategy; } - private static bool ShouldSkip(object instance) => instance switch - { - ReloadableComponent => true, - ComponentWithDisposeCallbacks => true, - ExecutionTrackingComponent => true, - _ => false - }; - private static void ExpandComponents(this PipelineComponent component, List components) { if (component is CompositeComponent pipeline) From 7134e4238c45d60ed2d4ccb07d1d25aff480fdd0 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 15:09:59 +0200 Subject: [PATCH 4/8] PR comments --- .../Pipeline/ExecutionTrackingComponent.cs | 2 +- .../ExecutionTrackingComponentTests.cs | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs index 6ffedcafa29..98c9b9e4e91 100644 --- a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -41,7 +41,7 @@ public override async ValueTask DisposeAsync() // We don't want to introduce locks or any synchronization primitives to main execution path // so we will do "dummy" retries until there are no more executions. - while (Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) != 0) + while (Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) > 0) { await _timeProvider.Delay(SleepDelay).ConfigureAwait(false); diff --git a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs index 810e647f307..5a8a5949980 100644 --- a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs +++ b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs @@ -74,6 +74,52 @@ public async Task DisposeAsync_Timeout_Ok() await execution; } + [Fact] + public async Task DisposeAsync_WhenRunningMultipleTasks_Ok() + { + var tasks = new ConcurrentQueue(); + await using var inner = new Inner + { + OnExecute = () => + { + var ev = new ManualResetEvent(false); + tasks.Enqueue(ev); + ev.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, TimeProvider.System); + var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow); + + for (int i = 0; i < 10; i++) + { + _ = Task.Run(() => pipeline.Execute(() => { })); + } + + while (tasks.Count != 10) + { + await Task.Delay(1); + } + + var disposeTask = component.DisposeAsync().AsTask(); + + while (tasks.Count > 1) + { + tasks.TryDequeue(out var ev).Should().BeTrue(); + ev!.Set(); + ev.Dispose(); + disposeTask.Wait(1).Should().BeFalse(); + inner.Disposed.Should().BeFalse(); + } + + // last one + tasks.TryDequeue(out var last).Should().BeTrue(); + last!.Set(); + last.Dispose(); + await disposeTask; + inner.Disposed.Should().BeTrue(); + } + private class Inner : PipelineComponent { public bool Disposed { get; private set; } @@ -90,6 +136,11 @@ internal override async ValueTask> ExecuteCore { OnExecute(); + if (Disposed) + { + throw new ObjectDisposedException("dummy"); + } + return await callback(context, state); } } From e9f69cffec231127e0e18f5623b2b2341e10f4cc Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 15:17:18 +0200 Subject: [PATCH 5/8] fixes --- .../Utils/Pipeline/ExecutionTrackingComponentTests.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs index 5a8a5949980..cc5c7bd83d8 100644 --- a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs +++ b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs @@ -136,11 +136,6 @@ internal override async ValueTask> ExecuteCore { OnExecute(); - if (Disposed) - { - throw new ObjectDisposedException("dummy"); - } - return await callback(context, state); } } From 489a995f89a67383a9a501f03303dd03d751517c Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 15:31:57 +0200 Subject: [PATCH 6/8] fixes --- .../CircuitBreaker/Controller/ScheduledTaskExecutorTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Polly.Core.Tests/CircuitBreaker/Controller/ScheduledTaskExecutorTests.cs b/test/Polly.Core.Tests/CircuitBreaker/Controller/ScheduledTaskExecutorTests.cs index 4b85dd4fc99..f24b8791324 100644 --- a/test/Polly.Core.Tests/CircuitBreaker/Controller/ScheduledTaskExecutorTests.cs +++ b/test/Polly.Core.Tests/CircuitBreaker/Controller/ScheduledTaskExecutorTests.cs @@ -123,11 +123,11 @@ public void Dispose_WhenScheduledTaskExecuting() ResilienceContextPool.Shared.Get(), out var task); - ready.WaitOne(TimeSpan.FromSeconds(2)).Should().BeTrue(); + ready.WaitOne(TimeSpan.FromSeconds(10)).Should().BeTrue(); scheduler.Dispose(); disposed.Set(); - scheduler.ProcessingTask.Wait(TimeSpan.FromSeconds(2)).Should().BeTrue(); + scheduler.ProcessingTask.Wait(TimeSpan.FromSeconds(10)).Should().BeTrue(); } [Fact] From 95b699ef398875ecf891e3b17f13261f8253cef3 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 15:59:59 +0200 Subject: [PATCH 7/8] kill mutant --- src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs index 98c9b9e4e91..2b5613d5237 100644 --- a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -45,6 +45,7 @@ public override async ValueTask DisposeAsync() { await _timeProvider.Delay(SleepDelay).ConfigureAwait(false); + // stryker disable once equality : no means to test this if (_timeProvider.GetElapsedTime(start) > Timeout) { break; From 984724f175adf460ae78b004050ed52c87c240b1 Mon Sep 17 00:00:00 2001 From: Martin Tomka Date: Fri, 8 Sep 2023 16:04:44 +0200 Subject: [PATCH 8/8] add new test --- .../Pipeline/ExecutionTrackingComponent.cs | 4 ++- .../ExecutionTrackingComponentTests.cs | 26 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs index 2b5613d5237..5a03e7f94fc 100644 --- a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -17,6 +17,8 @@ public ExecutionTrackingComponent(PipelineComponent component, TimeProvider time public PipelineComponent Component { get; } + public bool HasPendingExecutions => Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) > 0; + internal override async ValueTask> ExecuteCore( Func>> callback, ResilienceContext context, @@ -41,7 +43,7 @@ public override async ValueTask DisposeAsync() // We don't want to introduce locks or any synchronization primitives to main execution path // so we will do "dummy" retries until there are no more executions. - while (Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) > 0) + while (HasPendingExecutions) { await _timeProvider.Delay(SleepDelay).ConfigureAwait(false); diff --git a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs index cc5c7bd83d8..2cebbffc5e3 100644 --- a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs +++ b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs @@ -42,6 +42,32 @@ public async Task DisposeAsync_PendingOperations_Delayed() inner.Disposed.Should().BeTrue(); } + [Fact] + public async Task HasPendingExecutions_Ok() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + await using var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + component.HasPendingExecutions.Should().BeTrue(); + assert.Set(); + await execution; + + component.HasPendingExecutions.Should().BeFalse(); + } + [Fact] public async Task DisposeAsync_Timeout_Ok() {