Skip to content

Commit

Permalink
Add timeout mecanism for grain deactivation (#4883)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpetit authored and sergeybykov committed Aug 24, 2018
1 parent 64cd629 commit e323b8b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/Orleans.Core/Async/MultiTaskCompletionSource.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans
Expand All @@ -25,7 +26,6 @@ public Task Task
get { return tcs.Task; }
}


public void SetOneResult()
{
lock (lockable)
Expand Down
10 changes: 6 additions & 4 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ private Task DestroyActivations(List<ActivationData> list)

private async void StartDestroyActivations(List<ActivationData> list, MultiTaskCompletionSource tcs = null)
{
var cts = new CancellationTokenSource(this.collectionOptions.Value.DeactivationTimeout);

int number = destroyActivationsNumber;
destroyActivationsNumber++;
try
Expand All @@ -965,7 +967,7 @@ private async void StartDestroyActivations(List<ActivationData> list, MultiTaskC

try
{
await Task.WhenAll(tasks1);
await Task.WhenAll(tasks1).WithCancellation(cts.Token);
}
catch (Exception exc)
{
Expand All @@ -977,7 +979,7 @@ private async void StartDestroyActivations(List<ActivationData> list, MultiTaskC
foreach (var activation in list)
{
var activationData = activation; // Capture loop variable
var task = scheduler.RunOrQueueTask(() => CallGrainDeactivateAndCleanupStreams(activationData), activationData.SchedulingContext);
var task = scheduler.RunOrQueueTask(() => CallGrainDeactivateAndCleanupStreams(activationData, cts.Token), activationData.SchedulingContext);
tasks2.Add(new Tuple<Task, ActivationData>(task, activationData));
}
var asyncQueue = new AsyncBatchedContinuationQueue<ActivationData>();
Expand Down Expand Up @@ -1168,7 +1170,7 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary<strin
}
}

private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(ActivationData activation)
private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(ActivationData activation, CancellationToken ct)
{
try
{
Expand All @@ -1186,7 +1188,7 @@ private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(Activati
activation.State == ActivationState.Deactivating)
{
RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake.
await activation.Lifecycle.OnStop();
await activation.Lifecycle.OnStop().WithCancellation(ct);
}
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ public class GrainCollectionOptions
/// Period of inactivity necessary for a grain to be available for collection and deactivation by grain type.
/// </summary>
public Dictionary<string, TimeSpan> ClassSpecificCollectionAge { get; set; } = new Dictionary<string, TimeSpan>();

/// <summary>
/// Timeout value before giving up when trying to deactivate a grain activation
/// (waiting for all timers to stop and calling Grain.OnDeactivate())
/// </summary>
public TimeSpan DeactivationTimeout { get; set; } = DEFAULT_DEACTIVATION_TIMEOUT;
public static readonly TimeSpan DEFAULT_DEACTIVATION_TIMEOUT = TimeSpan.FromSeconds(30);
}
}
2 changes: 2 additions & 0 deletions test/Grains/TestGrainInterfaces/ITimerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface ITimerRequestGrain : IGrainWithIntegerKey
{
Task StartAndWaitTimerTick(TimeSpan dueTime);

Task StartStuckTimer(TimeSpan dueTime);

Task<string> GetRuntimeInstanceId();
}
}
12 changes: 12 additions & 0 deletions test/Grains/TestInternalGrains/TimerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,22 @@ public async Task StartAndWaitTimerTick(TimeSpan dueTime)
await this.completionSource.Task;
}

public Task StartStuckTimer(TimeSpan dueTime)
{
this.completionSource = new TaskCompletionSource<int>();
var timer = this.RegisterTimer(StuckTimerTick, null, dueTime, TimeSpan.FromSeconds(1));
return Task.CompletedTask;
}

private Task TimerTick(object state)
{
this.completionSource.SetResult(1);
return Task.CompletedTask;
}

private async Task StuckTimerTick(object state)
{
await completionSource.Task;
}
}
}
26 changes: 26 additions & 0 deletions test/Tester/Forwarding/ShutdownSiloTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ public class ShutdownSiloTests : TestClusterPerTest
{
public const int NumberOfSilos = 2;

public static readonly TimeSpan DeactivationTimeout = TimeSpan.FromSeconds(10);

internal class SiloBuilderConfigurator : ISiloBuilderConfigurator
{
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder.Configure<GrainCollectionOptions>(options => options.DeactivationTimeout = DeactivationTimeout);
}
}

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.Options.InitialSilosCount = NumberOfSilos;
builder.AddSiloBuilderConfigurator<SiloBuilderConfigurator>();
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.Globals.DefaultPlacementStrategy = "ActivationCountBasedPlacement";
Expand Down Expand Up @@ -59,6 +70,21 @@ public async Task SiloGracefulShutdown_PendingRequestTimers()
await promise;
}

[Fact, TestCategory("GracefulShutdown"), TestCategory("Functional")]
public async Task SiloGracefulShutdown_StuckTimers()
{
var grain = await GetTimerRequestGrainOnSecondary();

await grain.StartStuckTimer(TimeSpan.Zero);

await Task.Delay(TimeSpan.FromSeconds(1));
var stopwatch = Stopwatch.StartNew();
HostedCluster.StopSilo(HostedCluster.SecondarySilos.First());
stopwatch.Stop();

Assert.True(stopwatch.Elapsed > DeactivationTimeout);
}

[Fact, TestCategory("GracefulShutdown"), TestCategory("Functional")]
public async Task SiloGracefulShutdown_StuckActivation()
{
Expand Down

0 comments on commit e323b8b

Please sign in to comment.