From e323b8b1f714223e95f2048cb28a9e5ab409a541 Mon Sep 17 00:00:00 2001 From: Benjamin Petit Date: Fri, 24 Aug 2018 13:59:12 -0700 Subject: [PATCH] Add timeout mecanism for grain deactivation (#4883) --- .../Async/MultiTaskCompletionSource.cs | 2 +- src/Orleans.Runtime/Catalog/Catalog.cs | 10 ++++--- .../Options/GrainCollectionOptions.cs | 7 +++++ .../Grains/TestGrainInterfaces/ITimerGrain.cs | 2 ++ test/Grains/TestInternalGrains/TimerGrain.cs | 12 +++++++++ test/Tester/Forwarding/ShutdownSiloTests.cs | 26 +++++++++++++++++++ 6 files changed, 54 insertions(+), 5 deletions(-) diff --git a/src/Orleans.Core/Async/MultiTaskCompletionSource.cs b/src/Orleans.Core/Async/MultiTaskCompletionSource.cs index a78894deda..f02c1d8d66 100644 --- a/src/Orleans.Core/Async/MultiTaskCompletionSource.cs +++ b/src/Orleans.Core/Async/MultiTaskCompletionSource.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace Orleans @@ -25,7 +26,6 @@ public Task Task get { return tcs.Task; } } - public void SetOneResult() { lock (lockable) diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 6f5944bd17..29f2280fbb 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -950,6 +950,8 @@ private Task DestroyActivations(List list) private async void StartDestroyActivations(List list, MultiTaskCompletionSource tcs = null) { + var cts = new CancellationTokenSource(this.collectionOptions.Value.DeactivationTimeout); + int number = destroyActivationsNumber; destroyActivationsNumber++; try @@ -965,7 +967,7 @@ private async void StartDestroyActivations(List list, MultiTaskC try { - await Task.WhenAll(tasks1); + await Task.WhenAll(tasks1).WithCancellation(cts.Token); } catch (Exception exc) { @@ -977,7 +979,7 @@ private async void StartDestroyActivations(List list, MultiTaskC foreach (var activation in list) { var activationData = activation; // Capture loop variable - var task = scheduler.RunOrQueueTask(() => CallGrainDeactivateAndCleanupStreams(activationData), activationData.SchedulingContext); + var task = scheduler.RunOrQueueTask(() => CallGrainDeactivateAndCleanupStreams(activationData, cts.Token), activationData.SchedulingContext); tasks2.Add(new Tuple(task, activationData)); } var asyncQueue = new AsyncBatchedContinuationQueue(); @@ -1168,7 +1170,7 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary CallGrainDeactivateAndCleanupStreams(ActivationData activation) + private async Task CallGrainDeactivateAndCleanupStreams(ActivationData activation, CancellationToken ct) { try { @@ -1186,7 +1188,7 @@ private async Task CallGrainDeactivateAndCleanupStreams(Activati activation.State == ActivationState.Deactivating) { RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake. - await activation.Lifecycle.OnStop(); + await activation.Lifecycle.OnStop().WithCancellation(ct); } if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName); } diff --git a/src/Orleans.Runtime/Configuration/Options/GrainCollectionOptions.cs b/src/Orleans.Runtime/Configuration/Options/GrainCollectionOptions.cs index b1d62b373e..b0ef62c114 100644 --- a/src/Orleans.Runtime/Configuration/Options/GrainCollectionOptions.cs +++ b/src/Orleans.Runtime/Configuration/Options/GrainCollectionOptions.cs @@ -25,5 +25,12 @@ public class GrainCollectionOptions /// Period of inactivity necessary for a grain to be available for collection and deactivation by grain type. /// public Dictionary ClassSpecificCollectionAge { get; set; } = new Dictionary(); + + /// + /// Timeout value before giving up when trying to deactivate a grain activation + /// (waiting for all timers to stop and calling Grain.OnDeactivate()) + /// + public TimeSpan DeactivationTimeout { get; set; } = DEFAULT_DEACTIVATION_TIMEOUT; + public static readonly TimeSpan DEFAULT_DEACTIVATION_TIMEOUT = TimeSpan.FromSeconds(30); } } diff --git a/test/Grains/TestGrainInterfaces/ITimerGrain.cs b/test/Grains/TestGrainInterfaces/ITimerGrain.cs index 2763636843..880fc8bccf 100644 --- a/test/Grains/TestGrainInterfaces/ITimerGrain.cs +++ b/test/Grains/TestGrainInterfaces/ITimerGrain.cs @@ -29,6 +29,8 @@ public interface ITimerRequestGrain : IGrainWithIntegerKey { Task StartAndWaitTimerTick(TimeSpan dueTime); + Task StartStuckTimer(TimeSpan dueTime); + Task GetRuntimeInstanceId(); } } diff --git a/test/Grains/TestInternalGrains/TimerGrain.cs b/test/Grains/TestInternalGrains/TimerGrain.cs index 17408c1b98..a4fe1fd8d5 100644 --- a/test/Grains/TestInternalGrains/TimerGrain.cs +++ b/test/Grains/TestInternalGrains/TimerGrain.cs @@ -265,10 +265,22 @@ public async Task StartAndWaitTimerTick(TimeSpan dueTime) await this.completionSource.Task; } + public Task StartStuckTimer(TimeSpan dueTime) + { + this.completionSource = new TaskCompletionSource(); + var timer = this.RegisterTimer(StuckTimerTick, null, dueTime, TimeSpan.FromSeconds(1)); + return Task.CompletedTask; + } + private Task TimerTick(object state) { this.completionSource.SetResult(1); return Task.CompletedTask; } + + private async Task StuckTimerTick(object state) + { + await completionSource.Task; + } } } diff --git a/test/Tester/Forwarding/ShutdownSiloTests.cs b/test/Tester/Forwarding/ShutdownSiloTests.cs index e57fd3113c..1ff18c743f 100644 --- a/test/Tester/Forwarding/ShutdownSiloTests.cs +++ b/test/Tester/Forwarding/ShutdownSiloTests.cs @@ -17,9 +17,20 @@ public class ShutdownSiloTests : TestClusterPerTest { public const int NumberOfSilos = 2; + public static readonly TimeSpan DeactivationTimeout = TimeSpan.FromSeconds(10); + + internal class SiloBuilderConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.Configure(options => options.DeactivationTimeout = DeactivationTimeout); + } + } + protected override void ConfigureTestCluster(TestClusterBuilder builder) { builder.Options.InitialSilosCount = NumberOfSilos; + builder.AddSiloBuilderConfigurator(); builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.Globals.DefaultPlacementStrategy = "ActivationCountBasedPlacement"; @@ -59,6 +70,21 @@ public async Task SiloGracefulShutdown_PendingRequestTimers() await promise; } + [Fact, TestCategory("GracefulShutdown"), TestCategory("Functional")] + public async Task SiloGracefulShutdown_StuckTimers() + { + var grain = await GetTimerRequestGrainOnSecondary(); + + await grain.StartStuckTimer(TimeSpan.Zero); + + await Task.Delay(TimeSpan.FromSeconds(1)); + var stopwatch = Stopwatch.StartNew(); + HostedCluster.StopSilo(HostedCluster.SecondarySilos.First()); + stopwatch.Stop(); + + Assert.True(stopwatch.Elapsed > DeactivationTimeout); + } + [Fact, TestCategory("GracefulShutdown"), TestCategory("Functional")] public async Task SiloGracefulShutdown_StuckActivation() {