Skip to content

Commit

Permalink
Fix leaky coordinated shutdown (#5816)
Browse files Browse the repository at this point in the history
* Fix CoordinatedShutdown infinite loop

* Fix circular reference memory leak

* Fix memory leak
  • Loading branch information
Arkatufus authored Apr 4, 2022
1 parent 0309498 commit 3ee032d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private static ActorSystem CreateAndStartSystem(string name, Config withFallback
/// </returns>
public abstract Task Terminate();

internal abstract void FinalTerminate();
internal abstract Task FinalTerminate();

/// <summary>
/// Returns a task which will be completed after the <see cref="ActorSystem"/> has been
Expand Down
21 changes: 14 additions & 7 deletions src/core/Akka/Actor/CoordinatedShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// <summary>
/// The <see cref="ActorSystem"/>
/// </summary>
public ExtendedActorSystem System { get; }
public ExtendedActorSystem System { get; private set; }

/// <summary>
/// The set of named <see cref="Phase"/>s that will be executed during coordinated shutdown.
Expand All @@ -261,7 +261,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// <summary>
/// INTERNAL API
/// </summary>
internal ILoggingAdapter Log { get; }
internal ILoggingAdapter Log { get; private set; }

private readonly HashSet<string> _knownPhases;

Expand All @@ -270,7 +270,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// </summary>
internal readonly List<string> OrderedPhases;

private readonly ConcurrentBag<Func<Task<Done>>> _clrShutdownTasks = new ConcurrentBag<Func<Task<Done>>>();
private readonly ConcurrentSet<Func<Task<Done>>> _clrShutdownTasks = new ConcurrentSet<Func<Task<Done>>>();
private readonly ConcurrentDictionary<string, ImmutableList<(string, Func<Task<Done>>)>> _tasks = new ConcurrentDictionary<string, ImmutableList<(string, Func<Task<Done>>)>>();
private readonly AtomicReference<Reason> _runStarted = new AtomicReference<Reason>(null);
private readonly AtomicBoolean _clrHooksStarted = new AtomicBoolean(false);
Expand Down Expand Up @@ -335,7 +335,7 @@ internal void AddClrShutdownHook(Func<Task<Done>> hook)
{
if (!_clrHooksStarted)
{
_clrShutdownTasks.Add(hook);
_clrShutdownTasks.TryAdd(hook);
}
}

Expand Down Expand Up @@ -653,13 +653,16 @@ internal static void InitPhaseActorSystemTerminate(ActorSystem system, Config co

if (terminateActorSystem)
{
system.FinalTerminate();
return system.Terminate().ContinueWith(tr =>
return system.FinalTerminate().ContinueWith(tr =>
{
if (exitClr && !coord._runningClrHook)
{
Environment.Exit(0);
}

coord.System = null;
coord.Log = null;
coord._tasks.Clear(); // Clear the dictionary, just in case it is retained in memory
return Done.Instance;
});
}
Expand Down Expand Up @@ -691,7 +694,11 @@ internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShu
var exitTask = TerminateOnClrExit(coord);
// run all hooks during termination sequence
AppDomain.CurrentDomain.ProcessExit += exitTask;
system.WhenTerminated.ContinueWith(tr => { AppDomain.CurrentDomain.ProcessExit -= exitTask; });
system.WhenTerminated.ContinueWith(tr =>
{
AppDomain.CurrentDomain.ProcessExit -= exitTask;
coord._clrShutdownTasks.Clear(); // Clear the tasks, just in case it is retained in memory
});

coord.AddClrShutdownHook(() =>
{
Expand Down
11 changes: 5 additions & 6 deletions src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -525,20 +525,19 @@ public override Task Terminate()
{
if(Settings.CoordinatedShutdownRunByActorSystemTerminate)
{
CoordinatedShutdown.Get(this).Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);
} else
{
FinalTerminate();
return CoordinatedShutdown.Get(this)
.Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);
}
return WhenTerminated;
return FinalTerminate();
}

internal override void FinalTerminate()
internal override Task FinalTerminate()
{
Log.Debug("System shutdown initiated");
if (!Settings.LogDeadLettersDuringShutdown && _logDeadLetterListener != null)
Stop(_logDeadLetterListener);
_provider.Guardian.Stop();
return WhenTerminated;
}

/// <summary>
Expand Down

0 comments on commit 3ee032d

Please sign in to comment.