Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Silo lifecycle logic #9120

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Orleans.Core.Abstractions/Core/IGrainContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static class GrainContextExtensions
/// <returns>
/// A <see cref="Task"/> which will complete once the grain has deactivated.
/// </returns>
[Obsolete("This method is error-prone: waiting deactivation to complete from within the grain being deactivated will usually result in a deadlock.")]
public static Task DeactivateAsync(this IGrainContext grainContext, DeactivationReason deactivationReason, CancellationToken cancellationToken = default)
{
grainContext.Deactivate(deactivationReason, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ internal OrleansLifecycleCanceledException(string message)
/// <param name="innerException">
/// The inner exception.
/// </param>
internal OrleansLifecycleCanceledException(string message,
Exception innerException) : base(message, innerException)
internal OrleansLifecycleCanceledException(string message, Exception innerException)
: base(message, innerException)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public interface ILifecycleParticipant<TLifecycleObservable>
/// <summary>
/// Adds the provided observer as a participant in the lifecycle.
/// </summary>
/// <param name="observer">
/// <param name="lifecycle">
/// The observer.
/// </param>
void Participate(TLifecycleObservable observer);
void Participate(TLifecycleObservable lifecycle);
}
}
43 changes: 21 additions & 22 deletions src/Orleans.Core/Lifecycle/LifecycleSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@ namespace Orleans
/// <item><description>Once started, no other observers can be subscribed.</description></item>
/// <item><description>OnStart starts stages in order until first failure or cancellation.</description></item>
/// <item><description>OnStop stops states in reverse order starting from highest started stage.</description></item>
/// <item><description>OnStop stops all stages regardless of errors even if canceled canceled.</description></item>
/// <item><description>OnStop stops all stages regardless of errors even if canceled.</description></item>
/// </list>
/// </remarks>
public abstract class LifecycleSubject : ILifecycleSubject
{
private readonly List<OrderedObserver> subscribers;
protected readonly ILogger logger;
private int? highStage = null;
private readonly List<OrderedObserver> subscribers = [];
protected readonly ILogger Logger;
private int? _highStage = null;

protected LifecycleSubject(ILogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
this.logger = logger;
this.subscribers = new List<OrderedObserver>();
ArgumentNullException.ThrowIfNull(logger);
Logger = logger;
}

/// <summary>
Expand Down Expand Up @@ -87,9 +86,9 @@ protected static ImmutableDictionary<int, string> GetStageNames(Type type)
/// <param name="elapsed">The period of time which elapsed before <see cref="OnStart"/> completed once it was initiated.</param>
protected virtual void PerfMeasureOnStart(int stage, TimeSpan elapsed)
{
if (this.logger.IsEnabled(LogLevel.Trace))
if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.logger.LogTrace(
this.Logger.LogTrace(
(int)ErrorCode.SiloStartPerfMeasure,
"Starting lifecycle stage {Stage} took {Elapsed} Milliseconds",
stage,
Expand All @@ -100,7 +99,7 @@ protected virtual void PerfMeasureOnStart(int stage, TimeSpan elapsed)
/// <inheritdoc />
public virtual async Task OnStart(CancellationToken cancellationToken = default)
{
if (this.highStage.HasValue) throw new InvalidOperationException("Lifecycle has already been started.");
if (this._highStage.HasValue) throw new InvalidOperationException("Lifecycle has already been started.");
try
{
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers
Expand All @@ -113,7 +112,7 @@ public virtual async Task OnStart(CancellationToken cancellationToken = default)
}

var stage = observerGroup.Key;
this.highStage = stage;
this._highStage = stage;
var stopWatch = ValueStopwatch.StartNew();
await Task.WhenAll(observerGroup.Select(orderedObserver => CallOnStart(orderedObserver, cancellationToken)));
stopWatch.Stop();
Expand All @@ -124,11 +123,11 @@ public virtual async Task OnStart(CancellationToken cancellationToken = default)
}
catch (Exception ex) when (ex is not OrleansLifecycleCanceledException)
{
this.logger.LogError(
this.Logger.LogError(
(int)ErrorCode.LifecycleStartFailure,
ex,
"Lifecycle start canceled due to errors at stage {Stage}",
this.highStage);
this._highStage);
throw;
}

Expand Down Expand Up @@ -158,9 +157,9 @@ protected virtual void OnStartStageCompleted(int stage) { }
/// <param name="elapsed">The period of time which elapsed before <see cref="OnStop"/> completed once it was initiated.</param>
protected virtual void PerfMeasureOnStop(int stage, TimeSpan elapsed)
{
if (this.logger.IsEnabled(LogLevel.Trace))
if (this.Logger.IsEnabled(LogLevel.Trace))
{
this.logger.LogTrace(
this.Logger.LogTrace(
(int)ErrorCode.SiloStartPerfMeasure,
"Stopping lifecycle stage {Stage} took {Elapsed} Milliseconds",
stage,
Expand All @@ -172,22 +171,22 @@ protected virtual void PerfMeasureOnStop(int stage, TimeSpan elapsed)
public virtual async Task OnStop(CancellationToken cancellationToken = default)
{
// if not started, do nothing
if (!this.highStage.HasValue) return;
if (!this._highStage.HasValue) return;
var loggedCancellation = false;
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers
// include up to highest started stage
.Where(orderedObserver => orderedObserver.Stage <= highStage && orderedObserver.Observer != null)
.Where(orderedObserver => orderedObserver.Stage <= _highStage && orderedObserver.Observer != null)
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderByDescending(group => group.Key))
{
if (cancellationToken.IsCancellationRequested && !loggedCancellation)
{
this.logger.LogWarning("Lifecycle stop operations canceled by request.");
this.Logger.LogWarning("Lifecycle stop operations canceled by request.");
loggedCancellation = true;
}

var stage = observerGroup.Key;
this.highStage = stage;
this._highStage = stage;
try
{
var stopwatch = ValueStopwatch.StartNew();
Expand All @@ -197,11 +196,11 @@ public virtual async Task OnStop(CancellationToken cancellationToken = default)
}
catch (Exception ex)
{
this.logger.LogError(
this.Logger.LogError(
(int)ErrorCode.LifecycleStopFailure,
ex,
"Stopping lifecycle encountered an error at stage {Stage}. Continuing to stop.",
this.highStage);
this._highStage);
}

this.OnStopStageCompleted(stage);
Expand Down Expand Up @@ -229,7 +228,7 @@ protected virtual void OnStopStageCompleted(int stage) { }
public virtual IDisposable Subscribe(string observerName, int stage, ILifecycleObserver observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
if (this.highStage.HasValue) throw new InvalidOperationException("Lifecycle has already been started.");
if (this._highStage.HasValue) throw new InvalidOperationException("Lifecycle has already been started.");

var orderedObserver = new OrderedObserver(stage, observer);
this.subscribers.Add(orderedObserver);
Expand Down
8 changes: 4 additions & 4 deletions src/Orleans.Runtime/Catalog/ActivationCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -505,20 +505,20 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de

private async Task DeactivateActivationsFromCollector(List<ICollectibleGrainContext> list)
{
var cts = new CancellationTokenSource(_options.Value.DeactivationTimeout);
var mtcs = new MultiTaskCompletionSource(list.Count);

logger.LogInformation((int)ErrorCode.Catalog_ShutdownActivations_1, "DeactivateActivationsFromCollector: total {Count} to promptly Destroy.", list.Count);
CatalogInstruments.ActivationShutdownViaCollection();

void signalCompletion(Task task) => mtcs.SetOneResult();
Action signalCompletion = mtcs.SetOneResult;
var reason = GetDeactivationReason();
for (var i = 0; i < list.Count; i++)
{
var activationData = list[i];

// Continue deactivation when ready
_ = activationData.DeactivateAsync(reason, cts.Token).ContinueWith(signalCompletion);
// Continue deactivation when ready.
activationData.Deactivate(reason);
activationData.Deactivated.GetAwaiter().OnCompleted(signalCompletion);
}

await mtcs.Task;
Expand Down
4 changes: 3 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public ActivationData(
_shared = shared;
Address = grainAddress;
_serviceScope = applicationServices.CreateScope();
Debug.Assert(_serviceScope != null, "_serviceScope must not be null.");
_workItemGroup = createWorkItemGroup(this);
Debug.Assert(_workItemGroup != null, "_workItemGroup must not be null.");
_messageLoopTask = this.RunOrQueueTask(RunMessageLoop);
}

Expand Down Expand Up @@ -515,7 +517,7 @@ private async Task StartMigratingAsync(Dictionary<string, object>? requestContex

lock (this)
{
if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location"), cts.Token))
if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token))
{
// Grain is not able to start deactivating or has already completed.
return;
Expand Down
Loading
Loading