Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved transparency and timing of silo lifecycle (second attempt). #4175

Merged
merged 1 commit into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DynamoDBGrainStorage(string name, DynamoDBStorageOptions options, Seriali

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(this.options.InitStage, Init, Close);
lifecycle.Subscribe(OptionFormattingUtilities.Name<DynamoDBGrainStorage>(this.name), this.options.InitStage, Init, Close);
}

/// <summary> Initialization function for this storage provider. </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public AdoNetGrainStorage(

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(this.options.InitStage, Init, Close);
lifecycle.Subscribe(OptionFormattingUtilities.Name<AdoNetGrainStorage>(this.name), this.options.InitStage, Init, Close);
}
/// <summary>Clear state data function for this storage provider.</summary>
/// <see cref="IStorageProvider.ClearStateAsync(string, GrainReference, IGrainState)"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private static async Task DoOptimisticUpdate(Func<Task> updateOperation, CloudBl

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(this.options.InitStage, Init);
lifecycle.Subscribe(OptionFormattingUtilities.Name<AzureBlobGrainStorage>(this.name), this.options.InitStage, Init);
}

/// <summary> Initialization function for this storage provider. </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private Task Close(CancellationToken ct)

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(this.options.InitStage, Init, Close);
lifecycle.Subscribe(OptionFormattingUtilities.Name<AzureTableGrainStorage>(this.name), this.options.InitStage, Init, Close);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core.Abstractions/Core/Grain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void EnsureRuntime()

public virtual void Participate(IGrainLifecycle lifecycle)
{
lifecycle.Subscribe(GrainLifecycleStage.Activate, ct => OnActivateAsync(), ct => OnDeactivateAsync());
lifecycle.Subscribe(this.GetType().FullName, GrainLifecycleStage.Activate, ct => OnActivateAsync(), ct => OnDeactivateAsync());
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ protected virtual Task ReadStateAsync()
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
lifecycle.Subscribe(this.GetType().FullName, GrainLifecycleStage.SetupState, OnSetupState);
}

private async Task OnSetupState(CancellationToken ct)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ public interface ILifecycleObservable
/// <summary>
/// Subscribe for notification when a stage is reached while starting or stopping.
/// </summary>
/// <param name="observerName">name of observer, for reporting purposes</param>
/// <param name="stage">stage of interest</param>
/// <param name="observer">stage observer</param>
/// <returns>A disposable that can be disposed of to unsubscribe</returns>
IDisposable Subscribe(int stage, ILifecycleObserver observer);
IDisposable Subscribe(string observerName, int stage, ILifecycleObserver observer);
}
}
10 changes: 10 additions & 0 deletions src/Orleans.Core.Abstractions/Lifecycle/ILifecycleSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

namespace Orleans
{
/// <summary>
/// Both a lifecycle observer and observable lifecycle.
/// </summary>
public interface ILifecycleSubject : ILifecycleObservable, ILifecycleObserver
{
}
}
28 changes: 24 additions & 4 deletions src/Orleans.Core.Abstractions/Lifecycle/LifecycleExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,38 @@ public static class LifecycleExtensions
{
private static Func<CancellationToken, Task> NoOp => ct => Task.CompletedTask;

public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
public static IDisposable Subscribe(this ILifecycleObservable observable, string observerName, int stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
if (observable == null) throw new ArgumentNullException(nameof(observable));
if (onStart == null) throw new ArgumentNullException(nameof(onStart));
if (onStop == null) throw new ArgumentNullException(nameof(onStop));

return observable.Subscribe(stage, new Observer(onStart, onStop));
return observable.Subscribe(observerName, stage, new Observer(onStart, onStop));
}

public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart)
public static IDisposable Subscribe(this ILifecycleObservable observable, string observerName, int stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe(stage, new Observer(onStart, NoOp));
return observable.Subscribe(observerName, stage, onStart, NoOp);
}

public static IDisposable Subscribe<TObserver>(this ILifecycleObservable observable, int stage, ILifecycleObserver observer)
{
return observable.Subscribe(typeof(TObserver).FullName, stage, observer);
}

public static IDisposable Subscribe<TObserver>(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart, Func<CancellationToken, Task> onStop)
{
return observable.Subscribe(typeof(TObserver).FullName, stage, onStart, onStop);
}

public static IDisposable Subscribe<TObserver>(this ILifecycleObservable observable, int stage, Func<CancellationToken, Task> onStart)
{
return observable.Subscribe<TObserver>(stage, onStart, NoOp);
}

public static IDisposable Subscribe(this ILifecycleObservable observable, int stage, ILifecycleObserver observer)
{
return observable.Subscribe(observer.GetType().FullName, stage, observer);
}

public static Task OnStart(this ILifecycleObserver observer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public ClientOptionsLogger(ILogger<ClientOptionsLogger> logger, IServiceProvider

public void Participate(IClusterClientLifecycle lifecycle)
{
lifecycle.Subscribe(ClientOptionLoggerLifeCycleRing, this.OnStart);
lifecycle.Subscribe<ClientOptionsLogger>(ClientOptionLoggerLifeCycleRing, this.OnStart);
}

public Task OnStart(CancellationToken token)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Core/ClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private enum LifecycleState
public ClusterClient(OutsideRuntimeClient runtimeClient, ILoggerFactory loggerFactory, IOptions<ClientMessagingOptions> clientMessagingOptions)
{
this.runtimeClient = runtimeClient;
this.clusterClientLifecycle = new ClusterClientLifecycle(loggerFactory);
this.clusterClientLifecycle = new ClusterClientLifecycle(loggerFactory.CreateLogger<LifecycleSubject>());

//set PropagateActivityId flag from node cofnig
RequestContext.PropagateActivityId = clientMessagingOptions.Value.PropagateActivityId;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Lifecycle/ClusterClientLifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

namespace Orleans
{
internal class ClusterClientLifecycle : LifecycleObservable, IClusterClientLifecycle
internal class ClusterClientLifecycle : LifecycleSubject, IClusterClientLifecycle
{
public ClusterClientLifecycle(ILoggerFactory loggerFactory) : base(loggerFactory)
public ClusterClientLifecycle(ILogger<LifecycleSubject> logger) : base(logger)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ namespace Orleans
/// - OnStop stops states in reverse order starting from highest started stage.
/// - OnStop stops all stages regardless of errors even if canceled canceled.
/// </summary>
public class LifecycleObservable : ILifecycleObservable, ILifecycleObserver
public class LifecycleSubject : ILifecycleSubject
{
private readonly ConcurrentDictionary<object, OrderedObserver> subscribers;
private readonly ILogger logger;
private int? highStage = null;

public LifecycleObservable(ILoggerFactory loggerFactory)
public LifecycleSubject(ILogger<LifecycleSubject> logger)
{
this.logger = loggerFactory?.CreateLogger(GetType().FullName);
this.logger = logger;
this.subscribers = new ConcurrentDictionary<object, OrderedObserver>();
}

Expand Down Expand Up @@ -83,7 +83,7 @@ public async Task OnStop(CancellationToken ct)
}
}

public IDisposable Subscribe(int stage, ILifecycleObserver observer)
public IDisposable Subscribe(string observerName, int stage, ILifecycleObserver observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
if (this.highStage.HasValue) throw new InvalidOperationException("Lifecycle has already been started.");
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans.Core/LogConsistency/LogConsistentGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ protected LogConsistentGrain(IGrainIdentity identity, IGrainRuntime runtime)
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
lifecycle.Subscribe<ClientOptionsLogger>(GrainLifecycleStage.SetupState, OnSetupState);
if(this is ILogConsistencyProtocolParticipant)
{
lifecycle.Subscribe(GrainLifecycleStage.Activate - 1, PreActivate);
lifecycle.Subscribe(GrainLifecycleStage.Activate + 1, PostActivate);
lifecycle.Subscribe<LogConsistentGrain<TView>>(GrainLifecycleStage.Activate - 1, PreActivate);
lifecycle.Subscribe<LogConsistentGrain<TView>>(GrainLifecycleStage.Activate + 1, PostActivate);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Logging/ErrorCodes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ internal enum ErrorCode
LifecycleStartFailure = SiloBase + 50,
LifecycleStopFailure = SiloBase + 51,
SiloStartPerfMeasure = SiloBase + 52,
LifecycleStagesReport = SiloBase + 53,

CatalogBase = Runtime + 500,
CatalogNonExistingActivation1 = CatalogBase + 1,
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Providers/LegacyProviderConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ProviderLifecycleParticipant(IProviderConfiguration config, IServiceProvi
public virtual void Participate(TLifecycle lifecycle)
{
this.initStage = this.config.GetIntProperty(LegacyProviderConfigurator.InitStageName, this.defaultInitStage);
lifecycle.Subscribe(this.initStage, this.Init, this.ProviderClose);
lifecycle.Subscribe($"LegacyProvider-{typeof(TService).FullName}-{config.Type}-{config.Name}", this.initStage, this.Init, this.ProviderClose);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public Task<object> ExecuteCommand(int command, object arg)

public void Participate(ILifecycleObservable lifecycle)
{
lifecycle.Subscribe(this.options.InitStage, Init);
lifecycle.Subscribe(this.options.StartStage, Start, Close);
lifecycle.Subscribe(OptionFormattingUtilities.Name<PersistentStreamProvider>(this.Name), this.options.InitStage, Init);
lifecycle.Subscribe(OptionFormattingUtilities.Name<PersistentStreamProvider>(this.Name), this.options.StartStage, Start, Close);
}

public static IStreamProvider Create<TOptions>(IServiceProvider services, string name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public StartupTask(
/// <inheritdoc />
public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(
lifecycle.Subscribe<StartupTask>(
this.stage,
cancellation => this.startupTask(this.serviceProvider, cancellation));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

namespace Orleans.Runtime
{
/// <summary>
/// Observable silo lifecycle and observer.
/// </summary>
public interface ISiloLifecycleSubject : ISiloLifecycle, ILifecycleObserver
{
}
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public ActivationData(
if (null == collector) throw new ArgumentNullException(nameof(collector));

logger = loggerFactory.CreateLogger<ActivationData>();
this.lifecycle = new GrainLifecycle(loggerFactory);
this.lifecycle = new GrainLifecycle(loggerFactory.CreateLogger<LifecycleSubject>());
this.maxRequestProcessingTime = maxRequestProcessingTime;
this.maxWarningRequestProcessingTime = maxWarningRequestProcessingTime;
this.messagingOptions = messagingOptions.Value;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/Catalog/GrainLifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

namespace Orleans.Runtime
{
internal class GrainLifecycle : LifecycleObservable, IGrainLifecycle
internal class GrainLifecycle : LifecycleSubject, IGrainLifecycle
{
public GrainLifecycle(ILoggerFactory loggerFactory) : base(loggerFactory)
public GrainLifecycle(ILogger<LifecycleSubject> logger) : base(logger)
{
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ internal static IGrainExtensionMethodInvoker TryGetExtensionInvoker(GrainTypeMan

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(ServiceLifecycleStage.RuntimeInitialize, OnRuntimeInitializeStart, OnRuntimeInitializeStop);
lifecycle.Subscribe<InsideRuntimeClient>(ServiceLifecycleStage.RuntimeInitialize, OnRuntimeInitializeStart, OnRuntimeInitializeStop);
}
}
}
3 changes: 2 additions & 1 deletion src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ internal static void AddDefaultServices(HostBuilderContext context, IServiceColl
// Register system services.
services.TryAddSingleton<ILocalSiloDetails, LocalSiloDetails>();
services.TryAddSingleton<ISiloHost, SiloWrapper>();
services.TryAddSingleton<SiloLifecycle>();
services.TryAddTransient<ILifecycleSubject,LifecycleSubject>();
services.TryAddSingleton<ISiloLifecycleSubject,SiloLifecycleSubject>();
services.TryAddSingleton<ILifecycleParticipant<ISiloLifecycle>, SiloOptionsLogger>();
services.PostConfigure<SiloMessagingOptions>(options =>
{
Expand Down
12 changes: 0 additions & 12 deletions src/Orleans.Runtime/Lifecycle/SiloLifecycle.cs

This file was deleted.

100 changes: 100 additions & 0 deletions src/Orleans.Runtime/Lifecycle/SiloLifecycleSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Runtime
{
/// <summary>
/// Decorator over lifecycle subject for silo. Adds some logging and monitoring
/// </summary>
public class SiloLifecycleSubject : ISiloLifecycleSubject
{
private readonly ILifecycleSubject subject;
private readonly ILogger<SiloLifecycleSubject> logger;
private readonly List<MonitoredObserver> observers;

public SiloLifecycleSubject(ILifecycleSubject subject, ILogger<SiloLifecycleSubject> logger)
{
this.subject = subject;
this.logger = logger;
this.observers = new List<MonitoredObserver>();
}

public Task OnStart(CancellationToken ct)
{
foreach(IGrouping<int,MonitoredObserver> stage in this.observers.GroupBy(o => o.Stage).OrderBy(s => s.Key))
{
this.logger?.Info(ErrorCode.LifecycleStagesReport, $"Stage {stage.Key}: {string.Join(", ", stage.Select(o => o.Name))}", stage.Key);
}
return this.subject.OnStart(ct);
}

public Task OnStop(CancellationToken ct)
{
return this.subject.OnStop(ct);
}

public IDisposable Subscribe(string observerName, int stage, ILifecycleObserver observer)
{
var monitoredObserver = new MonitoredObserver(observerName, stage, observer, this.logger);
this.observers.Add(monitoredObserver);
return this.subject.Subscribe(observerName, stage, monitoredObserver);
}

private class MonitoredObserver : ILifecycleObserver
{
private readonly ILifecycleObserver observer;
private readonly ILogger<SiloLifecycleSubject> logger;

public MonitoredObserver(string name, int stage, ILifecycleObserver observer, ILogger<SiloLifecycleSubject> logger)
{
this.Name = name;
this.Stage = stage;
this.observer = observer;
this.logger = logger;
}

public string Name { get; }
public int Stage { get; }

public async Task OnStart(CancellationToken ct)
{
try
{
Stopwatch stopWatch = Stopwatch.StartNew();
await this.observer.OnStart(ct);
stopWatch.Stop();
this.logger?.Info(ErrorCode.SiloStartPerfMeasure, $"Lifecycle observer {this.Name} started in stage {this.Stage} which took {stopWatch.ElapsedMilliseconds} Milliseconds.");
}
catch (Exception ex)
{
string error = $"Lifecycle observer {this.Name} failed to start due to errors at stage {this.Stage}.";
this.logger?.Error(ErrorCode.LifecycleStartFailure, error, ex);
throw;
}
}

public async Task OnStop(CancellationToken ct)
{
try
{
Stopwatch stopWatch = Stopwatch.StartNew();
await this.observer.OnStop(ct);
stopWatch.Stop();
this.logger?.Info(ErrorCode.SiloStartPerfMeasure, $"Lifecycle observer {this.Name} stopped in stage {this.Stage} which took {stopWatch.ElapsedMilliseconds} Milliseconds.");
}
catch (Exception ex)
{
string error = $"Lifecycle observer {this.Name} failed to stop due to errors at stage {this.Stage}.";
this.logger?.Error(ErrorCode.LifecycleStartFailure, error, ex);
throw;
}
}
}
}
}
Loading