Skip to content

Commit

Permalink
Add activate/deactivate to grain lifecycle (#3385)
Browse files Browse the repository at this point in the history
  • Loading branch information
jason-bragg authored and jdom committed Sep 8, 2017
1 parent 3952aa6 commit 1479bdf
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 62 deletions.
12 changes: 9 additions & 3 deletions src/Orleans/Core/Grain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Orleans
/// <summary>
/// The abstract base class for all grain classes.
/// </summary>
public abstract class Grain : IAddressable
public abstract class Grain : IAddressable, ILifecycleParticipant<IGrainLifecycle>
{
// Do not use this directly because we currently don't provide a way to inject it;
// any interaction with it will result in non unit-testable code. Any behaviour that can be accessed
Expand Down Expand Up @@ -264,13 +264,18 @@ private void EnsureRuntime()
throw new InvalidOperationException("Grain was created outside of the Orleans creation process and no runtime was specified.");
}
}

public virtual void Participate(IGrainLifecycle lifecycle)
{
lifecycle.Subscribe(GrainLifecycleStage.Activate, ct => OnActivateAsync(), ct => OnDeactivateAsync());
}
}

/// <summary>
/// Base class for a Grain with declared persistent state.
/// </summary>
/// <typeparam name="TGrainState">The class of the persistent state object</typeparam>
public class Grain<TGrainState> : Grain, ILifecycleParticipant<GrainLifecycleStage> where TGrainState : new()
public class Grain<TGrainState> : Grain where TGrainState : new()
{
private IStorage<TGrainState> storage;

Expand Down Expand Up @@ -321,8 +326,9 @@ protected virtual Task ReadStateAsync()
return storage.ReadStateAsync();
}

public virtual void Participate(ILifecycleObservable<GrainLifecycleStage> lifecycle)
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
}

Expand Down
5 changes: 2 additions & 3 deletions src/Orleans/Lifecycle/ILifecycleObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ namespace Orleans
/// when the stage is reached when starting, and stopping.
/// Stages are started in ascending order, and stopped in decending order.
/// </summary>
/// <typeparam name="TStage"></typeparam>
public interface ILifecycleObservable<in TStage>
public interface ILifecycleObservable
{
/// <summary>
/// Subscribe for notification when a stage is reached while starting or stopping.
/// </summary>
/// <param name="stage">stage of interest</param>
/// <param name="observer">stage observer</param>
/// <returns>A disposable that can be disposed of to unsubscribe</returns>
IDisposable Subscribe(TStage stage, ILifecycleObserver observer);
IDisposable Subscribe(int stage, ILifecycleObserver observer);
}
}
5 changes: 3 additions & 2 deletions src/Orleans/Lifecycle/ILifecycleParticipant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ namespace Orleans
/// Provides hook to take part in lifecycle.
/// Also may act as a signal interface indicating that an object can take part in lifecycle.
/// </summary>
public interface ILifecycleParticipant<out TStage>
public interface ILifecycleParticipant<TLifecycleObservable>
where TLifecycleObservable : ILifecycleObservable
{
void Participate(ILifecycleObservable<TStage> lifecycle);
void Participate(TLifecycleObservable lifecycle);
}
}
4 changes: 2 additions & 2 deletions src/Orleans/Lifecycle/LifecycleExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public static class LifecycleExtensions
{
private static Func<CancellationToken, Task> NoOp => ct => Task.CompletedTask;

public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> observable, TStage stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
if (observable == null) throw new ArgumentNullException(nameof(observable));
if (onStart == null) throw new ArgumentNullException(nameof(onStart));
Expand All @@ -17,7 +17,7 @@ public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> ob
return observable.Subscribe(stage, new Observer(onStart, onStop));
}

public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> observable, TStage stage, Func<CancellationToken, Task> onStart)
public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe(stage, new Observer(onStart, NoOp));
}
Expand Down
17 changes: 8 additions & 9 deletions src/Orleans/Lifecycle/LifecycleObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

namespace Orleans
{
public class LifecycleObservable<TStage> : ILifecycleObservable<TStage>, ILifecycleObserver
public class LifecycleObservable : ILifecycleObservable, ILifecycleObserver
{
private readonly ConcurrentDictionary<object, OrderedObserver> subscribers;
private readonly Logger logger;
private TStage highStage;
private int highStage;

public LifecycleObservable(Logger logger)
{
Expand All @@ -23,7 +23,7 @@ public async Task OnStart(CancellationToken ct)
{
try
{
foreach (IGrouping<TStage, OrderedObserver> observerGroup in this.subscribers.Values
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers.Values
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderBy(group => group.Key))
{
Expand All @@ -45,7 +45,7 @@ public async Task OnStart(CancellationToken ct)

public async Task OnStop(CancellationToken ct)
{
foreach (IGrouping<TStage, OrderedObserver> observerGroup in this.subscribers.Values
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers.Values
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderByDescending(group => group.Key)
// skip all until we hit the highest started stage
Expand All @@ -67,7 +67,7 @@ public async Task OnStop(CancellationToken ct)
}
}

public IDisposable Subscribe(TStage stage, ILifecycleObserver observer)
public IDisposable Subscribe(int stage, ILifecycleObserver observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));

Expand All @@ -78,8 +78,7 @@ public IDisposable Subscribe(TStage stage, ILifecycleObserver observer)

private void Remove(object key)
{
OrderedObserver o;
this.subscribers.TryRemove(key, out o);
this.subscribers.TryRemove(key, out OrderedObserver o);
}

private static async Task WrapExecution(CancellationToken ct, Func<CancellationToken, Task> action)
Expand All @@ -105,9 +104,9 @@ public void Dispose()
private class OrderedObserver
{
public ILifecycleObserver Observer { get; }
public TStage Stage { get; }
public int Stage { get; }

public OrderedObserver(TStage stage, ILifecycleObserver observer)
public OrderedObserver(int stage, ILifecycleObserver observer)
{
Stage = stage;
Observer = observer;
Expand Down
25 changes: 21 additions & 4 deletions src/Orleans/LogConsistency/LogConsistentGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Runtime;
using Orleans.Storage;
using Microsoft.Extensions.DependencyInjection;
using Orleans.GrainDirectory;
using System.Reflection;
using Orleans.Providers;
using Orleans.MultiCluster;

namespace Orleans.LogConsistency
{
Expand All @@ -17,7 +18,7 @@ namespace Orleans.LogConsistency
/// (SiloAssemblyLoader uses it to extract type)
/// </summary>
/// <typeparam name="TView">The type of the view</typeparam>
public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<GrainLifecycleStage>
public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<IGrainLifecycle>

{
/// <summary>
Expand All @@ -36,9 +37,15 @@ public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<G
/// </summary>
protected abstract ILogViewAdaptorFactory DefaultAdaptorFactory { get; }

public virtual void Participate(ILifecycleObservable<GrainLifecycleStage> lifecycle)
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
if(this is ILogConsistencyProtocolParticipant)
{
lifecycle.Subscribe((int)GrainLifecycleStage.Activate - 1, PreActivate);
lifecycle.Subscribe((int)GrainLifecycleStage.Activate + 1, PostActivate);
}
}

private Task OnSetupState(CancellationToken ct)
Expand All @@ -52,6 +59,16 @@ private Task OnSetupState(CancellationToken ct)
return Task.CompletedTask;
}

private async Task PreActivate(CancellationToken ct)
{
await ((ILogConsistencyProtocolParticipant)this).PreActivateProtocolParticipant();
}

private async Task PostActivate(CancellationToken ct)
{
await ((ILogConsistencyProtocolParticipant)this).PostActivateProtocolParticipant();
}

private void InstallLogViewAdaptor(
IMultiClusterRegistrationStrategy mcRegistrationStrategy,
Factory<Grain, IMultiClusterRegistrationStrategy, ILogConsistencyProtocolServices> protocolServicesFactory,
Expand Down
42 changes: 32 additions & 10 deletions src/Orleans/Runtime/IGrainLifeCycle.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Runtime
{
/// <summary>
Expand All @@ -10,17 +13,36 @@ namespace Orleans.Runtime
/// </summary>
public enum GrainLifecycleStage
{
//None,
//Register,
SetupState, // Setup grain state prior to activation
//InvokeActivate,
//Completed
/// <summary>
/// Setup grain state prior to activation
/// </summary>
SetupState = 1<<10,

/// <summary>
/// Activate grain
/// </summary>
Activate = SetupState + 1<<10,
}

/// <summary>
/// Grain life cycle
/// </summary>
public interface IGrainLifecycle : ILifecycleObservable<GrainLifecycleStage>
public interface IGrainLifecycle : ILifecycleObservable
{
}

public static class GrainLifecycleExtensions
{
public static IDisposable Subscribe(this IGrainLifecycle observable, GrainLifecycleStage stage, ILifecycleObserver observer)
{
return observable.Subscribe((int)stage, observer);
}

public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
return observable.Subscribe((int)stage, onStart, onStop);
}

public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe((int)stage, onStart);
}
}
}
14 changes: 1 addition & 13 deletions src/OrleansRuntime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,18 +1136,11 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary<strin
// Note: This call is being made from within Scheduler.Queue wrapper, so we are already executing on worker thread
if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_BeforeCallingActivate, "About to call {1} grain's OnActivateAsync() method {0}", activation, grainTypeName);

// Call OnActivateAsync inline, but within try-catch wrapper to safely capture any exceptions thrown from called function
// Start grain lifecycle within try-catch wrapper to safely capture any exceptions thrown from called function
try
{
RequestContext.Import(requestContextData);
await activation.Lifecycle.OnStart();
// TODO: Consider pre and post activation stages in lifecycle for PreActivateProtocolParticipant and PostActivateProtocolParticipant like behaviors
if (activation.GrainInstance is ILogConsistencyProtocolParticipant)
{
await ((ILogConsistencyProtocolParticipant)activation.GrainInstance).PreActivateProtocolParticipant();
}
await activation.GrainInstance.OnActivateAsync();

if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_AfterCallingActivate, "Returned from calling {1} grain's OnActivateAsync() method {0}", activation, grainTypeName);

lock (activation)
Expand All @@ -1163,10 +1156,6 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary<strin
// Run message pump to see if there is a new request is queued to be processed
this.Dispatcher.RunMessagePump(activation);
}
if (activation.GrainInstance is ILogConsistencyProtocolParticipant)
{
await ((ILogConsistencyProtocolParticipant)activation.GrainInstance).PostActivateProtocolParticipant();
}
}
catch (Exception exc)
{
Expand Down Expand Up @@ -1207,7 +1196,6 @@ private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(Activati
activation.State == ActivationState.Deactivating)
{
RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake.
await activation.GrainInstance.OnDeactivateAsync();
await activation.Lifecycle.OnStop();
}
if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName);
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Catalog/GrainCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Grain CreateGrainInstance(IGrainActivationContext context)
grain.Identity = context.GrainIdentity;

// wire up to lifecycle
var participant = grain as ILifecycleParticipant<GrainLifecycleStage>;
var participant = grain as ILifecycleParticipant<IGrainLifecycle>;
participant?.Participate(context.ObservableLifecycle);

return grain;
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Catalog/GrainLifecycle.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

namespace Orleans.Runtime
{
internal class GrainLifecycle : LifecycleObservable<GrainLifecycleStage>, IGrainLifecycle
internal class GrainLifecycle : LifecycleObservable, IGrainLifecycle
{
public GrainLifecycle(Logger logger) : base(logger)
{
Expand Down
22 changes: 10 additions & 12 deletions test/Tester/Lifecycle/LifecycleTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -114,7 +113,7 @@ public async Task FailOnStopOnEachStageLifecycleTest()
[Fact, TestCategory("BVT"), TestCategory("Lifecycle")]
public async Task MultiStageObserverLifecycleTest()
{
var lifecycle = new LifecycleObservable<TestStages>(null);
var lifecycle = new LifecycleObservable(null);
var multiStageObserver = new MultiStageObserver();
multiStageObserver.Participate(lifecycle);
await lifecycle.OnStart();
Expand All @@ -129,15 +128,15 @@ private async Task<Dictionary<TestStages,List<Observer>>> RunLifecycle(Dictionar
{
// setup lifecycle observers
var observersByStage = new Dictionary<TestStages, List<Observer>>();
var lifecycle = new LifecycleObservable<TestStages>(null);
var lifecycle = new LifecycleObservable(null);
foreach (KeyValuePair<TestStages, int> kvp in observerCountByStage)
{
List<Observer> observers = Enumerable
.Range(0, kvp.Value)
.Select(i => new Observer(failOnStart.HasValue && kvp.Key == failOnStart, failOnStop.HasValue && kvp.Key == failOnStop))
.ToList();
observersByStage[kvp.Key] = observers;
observers.ForEach(o => lifecycle.Subscribe(kvp.Key, o));
observers.ForEach(o => lifecycle.Subscribe((int)kvp.Key, o));
}

// run lifecycle
Expand Down Expand Up @@ -199,7 +198,7 @@ public Task OnStop(CancellationToken ct)
/// <summary>
/// Single component which takes action at multiple stages of the lifecycle (most common expected pattern)
/// </summary>
private class MultiStageObserver : ILifecycleParticipant<TestStages>
private class MultiStageObserver : ILifecycleParticipant<ILifecycleObservable>
{
public Dictionary<TestStages,bool> Started { get; } = new Dictionary<TestStages, bool>();
public Dictionary<TestStages, bool> Stopped { get; } = new Dictionary<TestStages, bool>();
Expand All @@ -217,14 +216,13 @@ private Task OnStopStage(TestStages stage)
return Task.CompletedTask;
}

public void Participate(ILifecycleObservable<TestStages> lifecycle)
public void Participate(ILifecycleObservable lifecycle)
{
lifecycle.Subscribe(TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down));
lifecycle.Subscribe(TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize));
lifecycle.Subscribe(TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure));
lifecycle.Subscribe(TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run));
lifecycle.Subscribe((int)TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down));
lifecycle.Subscribe((int)TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize));
lifecycle.Subscribe((int)TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure));
lifecycle.Subscribe((int)TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run));
}
}

}
}
Loading

0 comments on commit 1479bdf

Please sign in to comment.