Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add activate/deactivate to grain lifecycle #3385

Merged
merged 2 commits into from
Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/Orleans/Core/Grain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Orleans
/// <summary>
/// The abstract base class for all grain classes.
/// </summary>
public abstract class Grain : IAddressable
public abstract class Grain : IAddressable, ILifecycleParticipant<IGrainLifecycle>
{
// Do not use this directly because we currently don't provide a way to inject it;
// any interaction with it will result in non unit-testable code. Any behaviour that can be accessed
Expand Down Expand Up @@ -264,13 +264,18 @@ private void EnsureRuntime()
throw new InvalidOperationException("Grain was created outside of the Orleans creation process and no runtime was specified.");
}
}

public virtual void Participate(IGrainLifecycle lifecycle)
{
lifecycle.Subscribe(GrainLifecycleStage.Activate, ct => OnActivateAsync(), ct => OnDeactivateAsync());
}
}

/// <summary>
/// Base class for a Grain with declared persistent state.
/// </summary>
/// <typeparam name="TGrainState">The class of the persistent state object</typeparam>
public class Grain<TGrainState> : Grain, ILifecycleParticipant<GrainLifecycleStage> where TGrainState : new()
public class Grain<TGrainState> : Grain where TGrainState : new()
{
private IStorage<TGrainState> storage;

Expand Down Expand Up @@ -321,8 +326,9 @@ protected virtual Task ReadStateAsync()
return storage.ReadStateAsync();
}

public virtual void Participate(ILifecycleObservable<GrainLifecycleStage> lifecycle)
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
}

Expand Down
5 changes: 2 additions & 3 deletions src/Orleans/Lifecycle/ILifecycleObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ namespace Orleans
/// when the stage is reached when starting, and stopping.
/// Stages are started in ascending order, and stopped in decending order.
/// </summary>
/// <typeparam name="TStage"></typeparam>
public interface ILifecycleObservable<in TStage>
public interface ILifecycleObservable
{
/// <summary>
/// Subscribe for notification when a stage is reached while starting or stopping.
/// </summary>
/// <param name="stage">stage of interest</param>
/// <param name="observer">stage observer</param>
/// <returns>A disposable that can be disposed of to unsubscribe</returns>
IDisposable Subscribe(TStage stage, ILifecycleObserver observer);
IDisposable Subscribe(int stage, ILifecycleObserver observer);
}
}
5 changes: 3 additions & 2 deletions src/Orleans/Lifecycle/ILifecycleParticipant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ namespace Orleans
/// Provides hook to take part in lifecycle.
/// Also may act as a signal interface indicating that an object can take part in lifecycle.
/// </summary>
public interface ILifecycleParticipant<out TStage>
public interface ILifecycleParticipant<TLifecycleObservable>
where TLifecycleObservable : ILifecycleObservable
{
void Participate(ILifecycleObservable<TStage> lifecycle);
void Participate(TLifecycleObservable lifecycle);
}
}
4 changes: 2 additions & 2 deletions src/Orleans/Lifecycle/LifecycleExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public static class LifecycleExtensions
{
private static Func<CancellationToken, Task> NoOp => ct => Task.CompletedTask;

public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> observable, TStage stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
if (observable == null) throw new ArgumentNullException(nameof(observable));
if (onStart == null) throw new ArgumentNullException(nameof(onStart));
Expand All @@ -17,7 +17,7 @@ public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> ob
return observable.Subscribe(stage, new Observer(onStart, onStop));
}

public static IDisposable Subscribe<TStage>(this ILifecycleObservable<TStage> observable, TStage stage, Func<CancellationToken, Task> onStart)
public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe(stage, new Observer(onStart, NoOp));
}
Expand Down
17 changes: 8 additions & 9 deletions src/Orleans/Lifecycle/LifecycleObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

namespace Orleans
{
public class LifecycleObservable<TStage> : ILifecycleObservable<TStage>, ILifecycleObserver
public class LifecycleObservable : ILifecycleObservable, ILifecycleObserver
{
private readonly ConcurrentDictionary<object, OrderedObserver> subscribers;
private readonly Logger logger;
private TStage highStage;
private int highStage;

public LifecycleObservable(Logger logger)
{
Expand All @@ -23,7 +23,7 @@ public async Task OnStart(CancellationToken ct)
{
try
{
foreach (IGrouping<TStage, OrderedObserver> observerGroup in this.subscribers.Values
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers.Values
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderBy(group => group.Key))
{
Expand All @@ -45,7 +45,7 @@ public async Task OnStart(CancellationToken ct)

public async Task OnStop(CancellationToken ct)
{
foreach (IGrouping<TStage, OrderedObserver> observerGroup in this.subscribers.Values
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers.Values
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderByDescending(group => group.Key)
// skip all until we hit the highest started stage
Expand All @@ -67,7 +67,7 @@ public async Task OnStop(CancellationToken ct)
}
}

public IDisposable Subscribe(TStage stage, ILifecycleObserver observer)
public IDisposable Subscribe(int stage, ILifecycleObserver observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));

Expand All @@ -78,8 +78,7 @@ public IDisposable Subscribe(TStage stage, ILifecycleObserver observer)

private void Remove(object key)
{
OrderedObserver o;
this.subscribers.TryRemove(key, out o);
this.subscribers.TryRemove(key, out OrderedObserver o);
}

private static async Task WrapExecution(CancellationToken ct, Func<CancellationToken, Task> action)
Expand All @@ -105,9 +104,9 @@ public void Dispose()
private class OrderedObserver
{
public ILifecycleObserver Observer { get; }
public TStage Stage { get; }
public int Stage { get; }

public OrderedObserver(TStage stage, ILifecycleObserver observer)
public OrderedObserver(int stage, ILifecycleObserver observer)
{
Stage = stage;
Observer = observer;
Expand Down
25 changes: 21 additions & 4 deletions src/Orleans/LogConsistency/LogConsistentGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Runtime;
using Orleans.Storage;
using Microsoft.Extensions.DependencyInjection;
using Orleans.GrainDirectory;
using System.Reflection;
using Orleans.Providers;
using Orleans.MultiCluster;

namespace Orleans.LogConsistency
{
Expand All @@ -17,7 +18,7 @@ namespace Orleans.LogConsistency
/// (SiloAssemblyLoader uses it to extract type)
/// </summary>
/// <typeparam name="TView">The type of the view</typeparam>
public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<GrainLifecycleStage>
public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<IGrainLifecycle>

{
/// <summary>
Expand All @@ -36,9 +37,15 @@ public abstract class LogConsistentGrain<TView> : Grain, ILifecycleParticipant<G
/// </summary>
protected abstract ILogViewAdaptorFactory DefaultAdaptorFactory { get; }

public virtual void Participate(ILifecycleObservable<GrainLifecycleStage> lifecycle)
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
if(this is ILogConsistencyProtocolParticipant)
{
lifecycle.Subscribe((int)GrainLifecycleStage.Activate - 1, PreActivate);
lifecycle.Subscribe((int)GrainLifecycleStage.Activate + 1, PostActivate);
}
}

private Task OnSetupState(CancellationToken ct)
Expand All @@ -52,6 +59,16 @@ private Task OnSetupState(CancellationToken ct)
return Task.CompletedTask;
}

private async Task PreActivate(CancellationToken ct)
{
await ((ILogConsistencyProtocolParticipant)this).PreActivateProtocolParticipant();
}

private async Task PostActivate(CancellationToken ct)
{
await ((ILogConsistencyProtocolParticipant)this).PostActivateProtocolParticipant();
}

private void InstallLogViewAdaptor(
IMultiClusterRegistrationStrategy mcRegistrationStrategy,
Factory<Grain, IMultiClusterRegistrationStrategy, ILogConsistencyProtocolServices> protocolServicesFactory,
Expand Down
42 changes: 32 additions & 10 deletions src/Orleans/Runtime/IGrainLifeCycle.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Runtime
{
/// <summary>
Expand All @@ -10,17 +13,36 @@ namespace Orleans.Runtime
/// </summary>
public enum GrainLifecycleStage
{
//None,
//Register,
SetupState, // Setup grain state prior to activation
//InvokeActivate,
//Completed
/// <summary>
/// Setup grain state prior to activation
/// </summary>
SetupState = 1<<10,

/// <summary>
/// Activate grain
/// </summary>
Activate = SetupState + 1<<10,
}

/// <summary>
/// Grain life cycle
/// </summary>
public interface IGrainLifecycle : ILifecycleObservable<GrainLifecycleStage>
public interface IGrainLifecycle : ILifecycleObservable
{
}

public static class GrainLifecycleExtensions
{
public static IDisposable Subscribe(this IGrainLifecycle observable, GrainLifecycleStage stage, ILifecycleObserver observer)
{
return observable.Subscribe((int)stage, observer);
}

public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
return observable.Subscribe((int)stage, onStart, onStop);
}

public static IDisposable Subscribe(this ILifecycleObservable observable, GrainLifecycleStage stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe((int)stage, onStart);
}
}
}
14 changes: 1 addition & 13 deletions src/OrleansRuntime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,18 +1136,11 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary<strin
// Note: This call is being made from within Scheduler.Queue wrapper, so we are already executing on worker thread
if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_BeforeCallingActivate, "About to call {1} grain's OnActivateAsync() method {0}", activation, grainTypeName);

// Call OnActivateAsync inline, but within try-catch wrapper to safely capture any exceptions thrown from called function
// Start grain lifecycle within try-catch wrapper to safely capture any exceptions thrown from called function
try
{
RequestContext.Import(requestContextData);
await activation.Lifecycle.OnStart();
// TODO: Consider pre and post activation stages in lifecycle for PreActivateProtocolParticipant and PostActivateProtocolParticipant like behaviors
if (activation.GrainInstance is ILogConsistencyProtocolParticipant)
{
await ((ILogConsistencyProtocolParticipant)activation.GrainInstance).PreActivateProtocolParticipant();
}
await activation.GrainInstance.OnActivateAsync();

if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_AfterCallingActivate, "Returned from calling {1} grain's OnActivateAsync() method {0}", activation, grainTypeName);

lock (activation)
Expand All @@ -1163,10 +1156,6 @@ private async Task CallGrainActivate(ActivationData activation, Dictionary<strin
// Run message pump to see if there is a new request is queued to be processed
this.Dispatcher.RunMessagePump(activation);
}
if (activation.GrainInstance is ILogConsistencyProtocolParticipant)
{
await ((ILogConsistencyProtocolParticipant)activation.GrainInstance).PostActivateProtocolParticipant();
}
}
catch (Exception exc)
{
Expand Down Expand Up @@ -1207,7 +1196,6 @@ private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(Activati
activation.State == ActivationState.Deactivating)
{
RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake.
await activation.GrainInstance.OnDeactivateAsync();
await activation.Lifecycle.OnStop();
}
if (logger.IsVerbose) logger.Verbose(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName);
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Catalog/GrainCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Grain CreateGrainInstance(IGrainActivationContext context)
grain.Identity = context.GrainIdentity;

// wire up to lifecycle
var participant = grain as ILifecycleParticipant<GrainLifecycleStage>;
var participant = grain as ILifecycleParticipant<IGrainLifecycle>;
participant?.Participate(context.ObservableLifecycle);

return grain;
Expand Down
2 changes: 1 addition & 1 deletion src/OrleansRuntime/Catalog/GrainLifecycle.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

namespace Orleans.Runtime
{
internal class GrainLifecycle : LifecycleObservable<GrainLifecycleStage>, IGrainLifecycle
internal class GrainLifecycle : LifecycleObservable, IGrainLifecycle
{
public GrainLifecycle(Logger logger) : base(logger)
{
Expand Down
22 changes: 10 additions & 12 deletions test/Tester/Lifecycle/LifecycleTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -114,7 +113,7 @@ public async Task FailOnStopOnEachStageLifecycleTest()
[Fact, TestCategory("BVT"), TestCategory("Lifecycle")]
public async Task MultiStageObserverLifecycleTest()
{
var lifecycle = new LifecycleObservable<TestStages>(null);
var lifecycle = new LifecycleObservable(null);
var multiStageObserver = new MultiStageObserver();
multiStageObserver.Participate(lifecycle);
await lifecycle.OnStart();
Expand All @@ -129,15 +128,15 @@ private async Task<Dictionary<TestStages,List<Observer>>> RunLifecycle(Dictionar
{
// setup lifecycle observers
var observersByStage = new Dictionary<TestStages, List<Observer>>();
var lifecycle = new LifecycleObservable<TestStages>(null);
var lifecycle = new LifecycleObservable(null);
foreach (KeyValuePair<TestStages, int> kvp in observerCountByStage)
{
List<Observer> observers = Enumerable
.Range(0, kvp.Value)
.Select(i => new Observer(failOnStart.HasValue && kvp.Key == failOnStart, failOnStop.HasValue && kvp.Key == failOnStop))
.ToList();
observersByStage[kvp.Key] = observers;
observers.ForEach(o => lifecycle.Subscribe(kvp.Key, o));
observers.ForEach(o => lifecycle.Subscribe((int)kvp.Key, o));
}

// run lifecycle
Expand Down Expand Up @@ -199,7 +198,7 @@ public Task OnStop(CancellationToken ct)
/// <summary>
/// Single component which takes action at multiple stages of the lifecycle (most common expected pattern)
/// </summary>
private class MultiStageObserver : ILifecycleParticipant<TestStages>
private class MultiStageObserver : ILifecycleParticipant<ILifecycleObservable>
{
public Dictionary<TestStages,bool> Started { get; } = new Dictionary<TestStages, bool>();
public Dictionary<TestStages, bool> Stopped { get; } = new Dictionary<TestStages, bool>();
Expand All @@ -217,14 +216,13 @@ private Task OnStopStage(TestStages stage)
return Task.CompletedTask;
}

public void Participate(ILifecycleObservable<TestStages> lifecycle)
public void Participate(ILifecycleObservable lifecycle)
{
lifecycle.Subscribe(TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down));
lifecycle.Subscribe(TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize));
lifecycle.Subscribe(TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure));
lifecycle.Subscribe(TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run));
lifecycle.Subscribe((int)TestStages.Down, ct => OnStartStage(TestStages.Down), ct => OnStopStage(TestStages.Down));
lifecycle.Subscribe((int)TestStages.Initialize, ct => OnStartStage(TestStages.Initialize), ct => OnStopStage(TestStages.Initialize));
lifecycle.Subscribe((int)TestStages.Configure, ct => OnStartStage(TestStages.Configure), ct => OnStopStage(TestStages.Configure));
lifecycle.Subscribe((int)TestStages.Run, ct => OnStartStage(TestStages.Run), ct => OnStopStage(TestStages.Run));
}
}

}
}
Loading