From 491d6aa38dd139b2dd9daeec20b6731c76c7ab82 Mon Sep 17 00:00:00 2001 From: jason-bragg Date: Thu, 7 Sep 2017 11:37:35 -0700 Subject: [PATCH 1/2] Add activate/deactivate to grain lifecycle --- src/Orleans/Core/Grain.cs | 12 +++++-- src/Orleans/Lifecycle/ILifecycleObservable.cs | 5 ++- .../Lifecycle/ILifecycleParticipant.cs | 5 +-- src/Orleans/Lifecycle/LifecycleExtensions.cs | 4 +-- src/Orleans/Lifecycle/LifecycleObservable.cs | 17 +++++----- .../LogConsistency/LogConsistentGrain.cs | 25 ++++++++++++--- src/Orleans/Runtime/IGrainLifeCycle.cs | 32 +++++++++++++++---- src/OrleansRuntime/Catalog/Catalog.cs | 14 +------- src/OrleansRuntime/Catalog/GrainCreator.cs | 2 +- src/OrleansRuntime/Catalog/GrainLifecycle.cs | 2 +- test/Tester/Lifecycle/LifecycleTests.cs | 22 ++++++------- .../TableExampleStorage.cs | 4 +-- 12 files changed, 85 insertions(+), 59 deletions(-) diff --git a/src/Orleans/Core/Grain.cs b/src/Orleans/Core/Grain.cs index 722a0a1c88..6715eea411 100644 --- a/src/Orleans/Core/Grain.cs +++ b/src/Orleans/Core/Grain.cs @@ -14,7 +14,7 @@ namespace Orleans /// /// The abstract base class for all grain classes. /// - public abstract class Grain : IAddressable + public abstract class Grain : IAddressable, ILifecycleParticipant { // Do not use this directly because we currently don't provide a way to inject it; // any interaction with it will result in non unit-testable code. Any behaviour that can be accessed @@ -264,13 +264,18 @@ private void EnsureRuntime() throw new InvalidOperationException("Grain was created outside of the Orleans creation process and no runtime was specified."); } } + + public virtual void Participate(IGrainLifecycle lifecycle) + { + lifecycle.Subscribe(GrainLifecycleStage.Activate, ct => OnActivateAsync(), ct => OnDeactivateAsync()); + } } /// /// Base class for a Grain with declared persistent state. /// /// The class of the persistent state object - public class Grain : Grain, ILifecycleParticipant where TGrainState : new() + public class Grain : Grain where TGrainState : new() { private IStorage storage; @@ -321,8 +326,9 @@ protected virtual Task ReadStateAsync() return storage.ReadStateAsync(); } - public virtual void Participate(ILifecycleObservable lifecycle) + public override void Participate(IGrainLifecycle lifecycle) { + base.Participate(lifecycle); lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState); } diff --git a/src/Orleans/Lifecycle/ILifecycleObservable.cs b/src/Orleans/Lifecycle/ILifecycleObservable.cs index ff563efd6b..3f6b4eae2e 100644 --- a/src/Orleans/Lifecycle/ILifecycleObservable.cs +++ b/src/Orleans/Lifecycle/ILifecycleObservable.cs @@ -8,8 +8,7 @@ namespace Orleans /// when the stage is reached when starting, and stopping. /// Stages are started in ascending order, and stopped in decending order. /// - /// - public interface ILifecycleObservable + public interface ILifecycleObservable { /// /// Subscribe for notification when a stage is reached while starting or stopping. @@ -17,6 +16,6 @@ public interface ILifecycleObservable /// stage of interest /// stage observer /// A disposable that can be disposed of to unsubscribe - IDisposable Subscribe(TStage stage, ILifecycleObserver observer); + IDisposable Subscribe(int stage, ILifecycleObserver observer); } } diff --git a/src/Orleans/Lifecycle/ILifecycleParticipant.cs b/src/Orleans/Lifecycle/ILifecycleParticipant.cs index 25f3b4ca49..90485db8d7 100644 --- a/src/Orleans/Lifecycle/ILifecycleParticipant.cs +++ b/src/Orleans/Lifecycle/ILifecycleParticipant.cs @@ -5,8 +5,9 @@ namespace Orleans /// Provides hook to take part in lifecycle. /// Also may act as a signal interface indicating that an object can take part in lifecycle. /// - public interface ILifecycleParticipant + public interface ILifecycleParticipant + where TLifecycleObservable : ILifecycleObservable { - void Participate(ILifecycleObservable lifecycle); + void Participate(TLifecycleObservable lifecycle); } } diff --git a/src/Orleans/Lifecycle/LifecycleExtensions.cs b/src/Orleans/Lifecycle/LifecycleExtensions.cs index b975bc77d8..fb4263b462 100644 --- a/src/Orleans/Lifecycle/LifecycleExtensions.cs +++ b/src/Orleans/Lifecycle/LifecycleExtensions.cs @@ -8,7 +8,7 @@ public static class LifecycleExtensions { private static Func NoOp => ct => Task.CompletedTask; - public static IDisposable Subscribe(this ILifecycleObservable observable, TStage stage, Func onStart, Func onStop) + public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func onStart, Func onStop) { if (observable == null) throw new ArgumentNullException(nameof(observable)); if (onStart == null) throw new ArgumentNullException(nameof(onStart)); @@ -17,7 +17,7 @@ public static IDisposable Subscribe(this ILifecycleObservable ob return observable.Subscribe(stage, new Observer(onStart, onStop)); } - public static IDisposable Subscribe(this ILifecycleObservable observable, TStage stage, Func onStart) + public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func onStart) { return observable.Subscribe(stage, new Observer(onStart, NoOp)); } diff --git a/src/Orleans/Lifecycle/LifecycleObservable.cs b/src/Orleans/Lifecycle/LifecycleObservable.cs index ab9558f54a..d7534b5ba8 100644 --- a/src/Orleans/Lifecycle/LifecycleObservable.cs +++ b/src/Orleans/Lifecycle/LifecycleObservable.cs @@ -7,11 +7,11 @@ namespace Orleans { - public class LifecycleObservable : ILifecycleObservable, ILifecycleObserver + public class LifecycleObservable : ILifecycleObservable, ILifecycleObserver { private readonly ConcurrentDictionary subscribers; private readonly Logger logger; - private TStage highStage; + private int highStage; public LifecycleObservable(Logger logger) { @@ -23,7 +23,7 @@ public async Task OnStart(CancellationToken ct) { try { - foreach (IGrouping observerGroup in this.subscribers.Values + foreach (IGrouping observerGroup in this.subscribers.Values .GroupBy(orderedObserver => orderedObserver.Stage) .OrderBy(group => group.Key)) { @@ -45,7 +45,7 @@ public async Task OnStart(CancellationToken ct) public async Task OnStop(CancellationToken ct) { - foreach (IGrouping observerGroup in this.subscribers.Values + foreach (IGrouping observerGroup in this.subscribers.Values .GroupBy(orderedObserver => orderedObserver.Stage) .OrderByDescending(group => group.Key) // skip all until we hit the highest started stage @@ -67,7 +67,7 @@ public async Task OnStop(CancellationToken ct) } } - public IDisposable Subscribe(TStage stage, ILifecycleObserver observer) + public IDisposable Subscribe(int stage, ILifecycleObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); @@ -78,8 +78,7 @@ public IDisposable Subscribe(TStage stage, ILifecycleObserver observer) private void Remove(object key) { - OrderedObserver o; - this.subscribers.TryRemove(key, out o); + this.subscribers.TryRemove(key, out OrderedObserver o); } private static async Task WrapExecution(CancellationToken ct, Func action) @@ -105,9 +104,9 @@ public void Dispose() private class OrderedObserver { public ILifecycleObserver Observer { get; } - public TStage Stage { get; } + public int Stage { get; } - public OrderedObserver(TStage stage, ILifecycleObserver observer) + public OrderedObserver(int stage, ILifecycleObserver observer) { Stage = stage; Observer = observer; diff --git a/src/Orleans/LogConsistency/LogConsistentGrain.cs b/src/Orleans/LogConsistency/LogConsistentGrain.cs index 895e1c76bf..9d74bf2005 100644 --- a/src/Orleans/LogConsistency/LogConsistentGrain.cs +++ b/src/Orleans/LogConsistency/LogConsistentGrain.cs @@ -2,12 +2,13 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Reflection; +using Microsoft.Extensions.DependencyInjection; using Orleans.Runtime; using Orleans.Storage; -using Microsoft.Extensions.DependencyInjection; using Orleans.GrainDirectory; -using System.Reflection; using Orleans.Providers; +using Orleans.MultiCluster; namespace Orleans.LogConsistency { @@ -17,7 +18,7 @@ namespace Orleans.LogConsistency /// (SiloAssemblyLoader uses it to extract type) /// /// The type of the view - public abstract class LogConsistentGrain : Grain, ILifecycleParticipant + public abstract class LogConsistentGrain : Grain, ILifecycleParticipant { /// @@ -36,9 +37,15 @@ public abstract class LogConsistentGrain : Grain, ILifecycleParticipant protected abstract ILogViewAdaptorFactory DefaultAdaptorFactory { get; } - public virtual void Participate(ILifecycleObservable lifecycle) + public override void Participate(IGrainLifecycle lifecycle) { + base.Participate(lifecycle); lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState); + if(this is ILogConsistencyProtocolParticipant) + { + lifecycle.Subscribe((int)GrainLifecycleStage.Activate - 1, PreActivate); + lifecycle.Subscribe((int)GrainLifecycleStage.Activate + 1, PostActivate); + } } private Task OnSetupState(CancellationToken ct) @@ -52,6 +59,16 @@ private Task OnSetupState(CancellationToken ct) return Task.CompletedTask; } + private async Task PreActivate(CancellationToken ct) + { + await ((ILogConsistencyProtocolParticipant)this).PreActivateProtocolParticipant(); + } + + private async Task PostActivate(CancellationToken ct) + { + await ((ILogConsistencyProtocolParticipant)this).PostActivateProtocolParticipant(); + } + private void InstallLogViewAdaptor( IMultiClusterRegistrationStrategy mcRegistrationStrategy, Factory protocolServicesFactory, diff --git a/src/Orleans/Runtime/IGrainLifeCycle.cs b/src/Orleans/Runtime/IGrainLifeCycle.cs index b2adc21194..b709100f5e 100644 --- a/src/Orleans/Runtime/IGrainLifeCycle.cs +++ b/src/Orleans/Runtime/IGrainLifeCycle.cs @@ -1,4 +1,7 @@ - +using System; +using System.Threading; +using System.Threading.Tasks; + namespace Orleans.Runtime { /// @@ -12,15 +15,30 @@ public enum GrainLifecycleStage { //None, //Register, - SetupState, // Setup grain state prior to activation - //InvokeActivate, + SetupState = 1<<10, // Setup grain state prior to activation + Activate = SetupState + 1<<10, // Acivate grain //Completed } - /// - /// Grain life cycle - /// - public interface IGrainLifecycle : ILifecycleObservable + public interface IGrainLifecycle : ILifecycleObservable + { + } + + public static class GrainLifecycleExtensions { + public static IDisposable Subscribe(this IGrainLifecycle observable, GrainLifecycleStage stage, ILifecycleObserver observer) + { + return observable.Subscribe((int)stage, observer); + } + + public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func onStart, Func onStop) + { + return observable.Subscribe((int)stage, onStart, onStop); + } + + public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func onStart) + { + return observable.Subscribe((int)stage, onStart); + } } } diff --git a/src/OrleansRuntime/Catalog/Catalog.cs b/src/OrleansRuntime/Catalog/Catalog.cs index 9e0ab344cd..10b066a12d 100644 --- a/src/OrleansRuntime/Catalog/Catalog.cs +++ b/src/OrleansRuntime/Catalog/Catalog.cs @@ -1136,18 +1136,11 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary CallGrainDeactivateAndCleanupStreams(Activati activation.State == ActivationState.Deactivating) { RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake. - await activation.GrainInstance.OnDeactivateAsync(); await activation.Lifecycle.OnStop(); } if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName); diff --git a/src/OrleansRuntime/Catalog/GrainCreator.cs b/src/OrleansRuntime/Catalog/GrainCreator.cs index c2f6837b94..f4cd21ca21 100644 --- a/src/OrleansRuntime/Catalog/GrainCreator.cs +++ b/src/OrleansRuntime/Catalog/GrainCreator.cs @@ -39,7 +39,7 @@ public Grain CreateGrainInstance(IGrainActivationContext context) grain.Identity = context.GrainIdentity; // wire up to lifecycle - var participant = grain as ILifecycleParticipant; + var participant = grain as ILifecycleParticipant; participant?.Participate(context.ObservableLifecycle); return grain; diff --git a/src/OrleansRuntime/Catalog/GrainLifecycle.cs b/src/OrleansRuntime/Catalog/GrainLifecycle.cs index 068577289e..4dd5638242 100644 --- a/src/OrleansRuntime/Catalog/GrainLifecycle.cs +++ b/src/OrleansRuntime/Catalog/GrainLifecycle.cs @@ -1,7 +1,7 @@  namespace Orleans.Runtime { - internal class GrainLifecycle : LifecycleObservable, IGrainLifecycle + internal class GrainLifecycle : LifecycleObservable, IGrainLifecycle { public GrainLifecycle(Logger logger) : base(logger) { diff --git a/test/Tester/Lifecycle/LifecycleTests.cs b/test/Tester/Lifecycle/LifecycleTests.cs index 5fbe2eb465..c568e005a5 100644 --- a/test/Tester/Lifecycle/LifecycleTests.cs +++ b/test/Tester/Lifecycle/LifecycleTests.cs @@ -1,5 +1,4 @@ - -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -114,7 +113,7 @@ public async Task FailOnStopOnEachStageLifecycleTest() [Fact, TestCategory("BVT"), TestCategory("Lifecycle")] public async Task MultiStageObserverLifecycleTest() { - var lifecycle = new LifecycleObservable(null); + var lifecycle = new LifecycleObservable(null); var multiStageObserver = new MultiStageObserver(); multiStageObserver.Participate(lifecycle); await lifecycle.OnStart(); @@ -129,7 +128,7 @@ private async Task>> RunLifecycle(Dictionar { // setup lifecycle observers var observersByStage = new Dictionary>(); - var lifecycle = new LifecycleObservable(null); + var lifecycle = new LifecycleObservable(null); foreach (KeyValuePair kvp in observerCountByStage) { List observers = Enumerable @@ -137,7 +136,7 @@ private async Task>> RunLifecycle(Dictionar .Select(i => new Observer(failOnStart.HasValue && kvp.Key == failOnStart, failOnStop.HasValue && kvp.Key == failOnStop)) .ToList(); observersByStage[kvp.Key] = observers; - observers.ForEach(o => lifecycle.Subscribe(kvp.Key, o)); + observers.ForEach(o => lifecycle.Subscribe((int)kvp.Key, o)); } // run lifecycle @@ -199,7 +198,7 @@ public Task OnStop(CancellationToken ct) /// /// Single component which takes action at multiple stages of the lifecycle (most common expected pattern) /// - private class MultiStageObserver : ILifecycleParticipant + private class MultiStageObserver : ILifecycleParticipant { public Dictionary Started { get; } = new Dictionary(); public Dictionary Stopped { get; } = new Dictionary(); @@ -217,14 +216,13 @@ private Task OnStopStage(TestStages stage) return Task.CompletedTask; } - public void Participate(ILifecycleObservable lifecycle) + public void Participate(ILifecycleObservable lifecycle) { - lifecycle.Subscribe(TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down)); - lifecycle.Subscribe(TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize)); - lifecycle.Subscribe(TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure)); - lifecycle.Subscribe(TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run)); + lifecycle.Subscribe((int)TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down)); + lifecycle.Subscribe((int)TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize)); + lifecycle.Subscribe((int)TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure)); + lifecycle.Subscribe((int)TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run)); } } - } } diff --git a/test/Tester/StorageFacet/Feature.Implementations/TableExampleStorage.cs b/test/Tester/StorageFacet/Feature.Implementations/TableExampleStorage.cs index 93f966f3e1..86385a529c 100644 --- a/test/Tester/StorageFacet/Feature.Implementations/TableExampleStorage.cs +++ b/test/Tester/StorageFacet/Feature.Implementations/TableExampleStorage.cs @@ -7,7 +7,7 @@ namespace Tester.StorageFacet.Implementations { - public class TableExampleStorage : IExampleStorage, ILifecycleParticipant + public class TableExampleStorage : IExampleStorage, ILifecycleParticipant { private IExampleStorageConfig config; private bool activateCalled; @@ -31,7 +31,7 @@ public Task LoadState(CancellationToken ct) return Task.CompletedTask; } - public void Participate(ILifecycleObservable lifecycle) + public void Participate(IGrainLifecycle lifecycle) { lifecycle.Subscribe(GrainLifecycleStage.SetupState, LoadState); } From 33e6d8e68d6dd4fc9a4f7d71defe8298441ec0fc Mon Sep 17 00:00:00 2001 From: jason-bragg Date: Thu, 7 Sep 2017 17:25:06 -0700 Subject: [PATCH 2/2] Addressed PR feedback - Added xml comments to grain lifecycle states. --- src/Orleans/Runtime/IGrainLifeCycle.cs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Orleans/Runtime/IGrainLifeCycle.cs b/src/Orleans/Runtime/IGrainLifeCycle.cs index b709100f5e..14ac96889d 100644 --- a/src/Orleans/Runtime/IGrainLifeCycle.cs +++ b/src/Orleans/Runtime/IGrainLifeCycle.cs @@ -13,11 +13,15 @@ namespace Orleans.Runtime /// public enum GrainLifecycleStage { - //None, - //Register, - SetupState = 1<<10, // Setup grain state prior to activation - Activate = SetupState + 1<<10, // Acivate grain - //Completed + /// + /// Setup grain state prior to activation + /// + SetupState = 1<<10, + + /// + /// Activate grain + /// + Activate = SetupState + 1<<10, } public interface IGrainLifecycle : ILifecycleObservable