diff --git a/src/Orleans.Core/Configuration/Options/ClusterMembershipOptions.cs b/src/Orleans.Core/Configuration/Options/ClusterMembershipOptions.cs
index 9a4bf4a6b4..4ecbecc0fc 100644
--- a/src/Orleans.Core/Configuration/Options/ClusterMembershipOptions.cs
+++ b/src/Orleans.Core/Configuration/Options/ClusterMembershipOptions.cs
@@ -125,5 +125,10 @@ public class ClusterMembershipOptions
/// Gets or sets a value indicating whether to enable probing silos indirectly, via other silos.
///
public bool EnableIndirectProbes { get; set; } = true;
+
+ ///
+ /// Gets or sets a value indicating whether to enable membership eviction of silos when in a state of `Joining` or `Created` for longer than MaxJoinAttemptTime
+ ///
+ public bool EvictWhenMaxJoinAttemptTimeExceeded { get; set; } = true;
}
}
diff --git a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
index a593f904c9..9f7fece8cc 100644
--- a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
+++ b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
@@ -78,7 +78,14 @@ private async Task ProcessMembershipUpdates()
if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Starting to process membership updates");
await foreach (var tableSnapshot in this.membershipService.MembershipTableUpdates.WithCancellation(this.shutdownCancellation.Token))
{
- var newMonitoredSilos = this.UpdateMonitoredSilos(tableSnapshot, this.monitoredSilos, DateTime.UtcNow);
+ var utcNow = DateTime.UtcNow;
+
+ var newMonitoredSilos = this.UpdateMonitoredSilos(tableSnapshot, this.monitoredSilos, utcNow);
+
+ if (this.clusterMembershipOptions.CurrentValue.EvictWhenMaxJoinAttemptTimeExceeded)
+ {
+ await this.EvictStaleStateSilos(tableSnapshot, utcNow);
+ }
foreach (var pair in this.monitoredSilos)
{
@@ -103,6 +110,45 @@ private async Task ProcessMembershipUpdates()
}
}
+ private async Task EvictStaleStateSilos(
+ MembershipTableSnapshot membership,
+ DateTime utcNow)
+ {
+ foreach (var member in membership.Entries)
+ {
+ if (IsCreatedOrJoining(member.Value.Status)
+ && HasExceededMaxJoinTime(
+ startTime: member.Value.StartTime,
+ now: utcNow,
+ maxJoinTime: this.clusterMembershipOptions.CurrentValue.MaxJoinAttemptTime))
+ {
+ try
+ {
+ if (this.log.IsEnabled(LogLevel.Debug)) this.log.LogDebug("Stale silo with a joining or created state found, calling `TryToSuspectOrKill`");
+ await this.membershipService.TryToSuspectOrKill(member.Key);
+ }
+ catch(Exception exception)
+ {
+ log.LogError(
+ exception,
+ "Silo {suspectAddress} has had the status `{siloStatus}` for longer than `MaxJoinAttemptTime` but a call to `TryToSuspectOrKill` has failed",
+ member.Value.SiloAddress,
+ member.Value.Status.ToString());
+ }
+ }
+ }
+
+ static bool IsCreatedOrJoining(SiloStatus status)
+ {
+ return status == SiloStatus.Created || status == SiloStatus.Joining;
+ }
+
+ static bool HasExceededMaxJoinTime(DateTime startTime, DateTime now, TimeSpan maxJoinTime)
+ {
+ return now > startTime.Add(maxJoinTime);
+ }
+ }
+
[Pure]
private ImmutableDictionary UpdateMonitoredSilos(
MembershipTableSnapshot membership,
diff --git a/test/NonSilo.Tests/Membership/ClusterHealthMonitorTests.cs b/test/NonSilo.Tests/Membership/ClusterHealthMonitorTests.cs
index 421aa8ebbc..df98cf3741 100644
--- a/test/NonSilo.Tests/Membership/ClusterHealthMonitorTests.cs
+++ b/test/NonSilo.Tests/Membership/ClusterHealthMonitorTests.cs
@@ -3,7 +3,6 @@
using Microsoft.Extensions.Options;
using NonSilo.Tests.Utilities;
using NSubstitute;
-using Orleans;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Runtime.MembershipService;
@@ -103,6 +102,42 @@ public async Task ClusterHealthMonitor_OneVoteNeededToKill()
await ClusterHealthMonitor_BasicScenario_Runner(enableIndirectProbes: false, numVotesForDeathDeclaration: 1);
}
+ ///
+ /// Tests basic operation of and , but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
+ ///
+ [Fact]
+ public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_OneVoteNeededToKill()
+ {
+ await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 1);
+ }
+
+ ///
+ /// Tests basic operation of and , but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
+ ///
+ [Fact]
+ public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_TwoVotesNeededToKill()
+ {
+ await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 2);
+ }
+
+ ///
+ /// Tests basic operation of and , but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
+ ///
+ [Fact]
+ public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_ThreeVotesNeededToKill()
+ {
+ await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: true, numVotesForDeathDeclaration: 3);
+ }
+
+ ///
+ /// Tests basic operation of and , but with EvictWhenMaxJoinAttemptTimeExceeded enabled.
+ ///
+ [Fact]
+ public async Task ClusterHealthMonitor_SilosWithStaleCreatedOrJoiningState_Disabled()
+ {
+ await ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(evictWhenMaxJoinAttemptTimeExceeded: false, numVotesForDeathDeclaration: 3);
+ }
+
private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirectProbes, int? numVotesForDeathDeclaration = default)
{
var clusterMembershipOptions = new ClusterMembershipOptions
@@ -115,19 +150,7 @@ private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirect
clusterMembershipOptions.NumVotesForDeathDeclaration = numVotesForDeathDeclaration.Value;
}
- var manager = new MembershipTableManager(
- localSiloDetails: this.localSiloDetails,
- clusterMembershipOptions: Options.Create(clusterMembershipOptions),
- membershipTable: membershipTable,
- fatalErrorHandler: this.fatalErrorHandler,
- gossiper: this.membershipGossiper,
- log: this.loggerFactory.CreateLogger(),
- timerFactory: new AsyncTimerFactory(this.loggerFactory),
- this.lifecycle);
- ((ILifecycleParticipant)manager).Participate(this.lifecycle);
-
- var membershipService = Substitute.For();
- membershipService.CurrentSnapshot.ReturnsForAnyArgs(info => manager.MembershipTableSnapshot.CreateClusterMembershipSnapshot());
+ var testRig = CreateClusterHealthMonitorTestRig(clusterMembershipOptions);
var probeCalls = new ConcurrentQueue<(SiloAddress Target, int ProbeNumber, bool IsIndirect)>();
this.prober.Probe(default, default).ReturnsForAnyArgs(info =>
{
@@ -145,31 +168,8 @@ private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirect
});
});
- var optionsMonitor = Substitute.For>();
- optionsMonitor.CurrentValue.ReturnsForAnyArgs(clusterMembershipOptions);
-
- var monitor = new ClusterHealthMonitor(
- this.localSiloDetails,
- manager,
- this.loggerFactory.CreateLogger(),
- optionsMonitor,
- this.fatalErrorHandler,
- null);
- ((ILifecycleParticipant)monitor).Participate(this.lifecycle);
- var testAccessor = (ClusterHealthMonitor.ITestAccessor)monitor;
- testAccessor.CreateMonitor = s => new SiloHealthMonitor(
- s,
- testAccessor.OnProbeResult,
- optionsMonitor,
- this.loggerFactory,
- this.prober,
- this.timerFactory,
- this.localSiloHealthMonitor,
- membershipService,
- this.localSiloDetails);
-
await this.lifecycle.OnStart();
- Assert.Empty(testAccessor.MonitoredSilos);
+ Assert.Empty(testRig.TestAccessor.MonitoredSilos);
var otherSilos = new[]
{
@@ -184,7 +184,7 @@ private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirect
Entry(Silo("127.0.0.200:900@100"), SiloStatus.Active)
};
- var lastVersion = testAccessor.ObservedVersion;
+ var lastVersion = testRig.TestAccessor.ObservedVersion;
// Add the new silos
var table = await this.membershipTable.ReadAll();
@@ -194,25 +194,25 @@ private async Task ClusterHealthMonitor_BasicScenario_Runner(bool enableIndirect
Assert.True(await this.membershipTable.InsertRow(entry, table.Version.Next()));
}
- await manager.Refresh();
+ await testRig.Manager.Refresh();
- await Until(() => testAccessor.ObservedVersion > lastVersion);
- lastVersion = testAccessor.ObservedVersion;
+ await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
+ lastVersion = testRig.TestAccessor.ObservedVersion;
// No silos should be monitored by this silo until it becomes active.
- Assert.Empty(testAccessor.MonitoredSilos);
+ Assert.Empty(testRig.TestAccessor.MonitoredSilos);
- await manager.UpdateStatus(SiloStatus.Active);
+ await testRig.Manager.UpdateStatus(SiloStatus.Active);
- await Until(() => testAccessor.ObservedVersion > lastVersion);
- lastVersion = testAccessor.ObservedVersion;
+ await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
+ lastVersion = testRig.TestAccessor.ObservedVersion;
// Now that this silo is active, it should be monitoring some fraction of the other active silos
- await Until(() => testAccessor.MonitoredSilos.Count > 0);
+ await Until(() => testRig.TestAccessor.MonitoredSilos.Count > 0);
Assert.NotEmpty(this.timers);
- Assert.DoesNotContain(testAccessor.MonitoredSilos, s => s.Key.Equals(this.localSilo));
- Assert.Equal(clusterMembershipOptions.NumProbedSilos, testAccessor.MonitoredSilos.Count);
- Assert.All(testAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.SiloAddress));
+ Assert.DoesNotContain(testRig.TestAccessor.MonitoredSilos, s => s.Key.Equals(this.localSilo));
+ Assert.Equal(clusterMembershipOptions.NumProbedSilos, testRig.TestAccessor.MonitoredSilos.Count);
+ Assert.All(testRig.TestAccessor.MonitoredSilos, m => m.Key.Equals(m.Value.SiloAddress));
Assert.Empty(probeCalls);
// Check that those silos are actually being probed periodically
@@ -226,9 +226,9 @@ await UntilEqual(clusterMembershipOptions.NumProbedSilos, () =>
return probeCalls.Count;
});
Assert.Equal(clusterMembershipOptions.NumProbedSilos, probeCalls.Count);
- while (probeCalls.TryDequeue(out var call)) Assert.Contains(testAccessor.MonitoredSilos, k => k.Key.Equals(call.Item1));
+ while (probeCalls.TryDequeue(out var call)) Assert.Contains(testRig.TestAccessor.MonitoredSilos, k => k.Key.Equals(call.Item1));
- var monitoredSilos = testAccessor.MonitoredSilos.Values.ToList();
+ var monitoredSilos = testRig.TestAccessor.MonitoredSilos.Values.ToList();
foreach (var siloMonitor in monitoredSilos)
{
Assert.Equal(0, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
@@ -306,7 +306,7 @@ await UntilEqual(clusterMembershipOptions.NumProbedSilos, () =>
return;
}
- await manager.Refresh();
+ await testRig.Manager.Refresh();
// Make the probes succeed again.
this.prober.Probe(default, default).ReturnsForAnyArgs(info =>
@@ -329,9 +329,9 @@ await UntilEqual(clusterMembershipOptions.NumProbedSilos, () =>
while (probeCalls.TryDequeue(out _)) ;
// Wait for probes to be fired
- this.output.WriteLine($"Firing probes for silos: {string.Join(", ", testAccessor.MonitoredSilos.Keys)}");
+ this.output.WriteLine($"Firing probes for silos: {string.Join(", ", testRig.TestAccessor.MonitoredSilos.Keys)}");
var probesReceived = new HashSet();
- await UntilEqual(testAccessor.MonitoredSilos.Count, () =>
+ await UntilEqual(testRig.TestAccessor.MonitoredSilos.Count, () =>
{
if (this.timerCalls.TryDequeue(out var timer))
{
@@ -346,7 +346,7 @@ await UntilEqual(testAccessor.MonitoredSilos.Count, () =>
return probesReceived.Count;
});
- foreach (var siloMonitor in testAccessor.MonitoredSilos.Values)
+ foreach (var siloMonitor in testRig.TestAccessor.MonitoredSilos.Values)
{
this.output.WriteLine($"Checking missed probes on {siloMonitor.SiloAddress}: {((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes}");
Assert.Equal(0, ((SiloHealthMonitor.ITestAccessor)siloMonitor).MissedProbes);
@@ -355,9 +355,134 @@ await UntilEqual(testAccessor.MonitoredSilos.Count, () =>
await StopLifecycle();
}
+ private async Task ClusterHealthMonitor_StaleJoinOrCreatedSilos_Runner(bool evictWhenMaxJoinAttemptTimeExceeded = true, int? numVotesForDeathDeclaration = default)
+ {
+ var clusterMembershipOptions = new ClusterMembershipOptions
+ {
+ EvictWhenMaxJoinAttemptTimeExceeded = evictWhenMaxJoinAttemptTimeExceeded
+ };
+
+ if (numVotesForDeathDeclaration.HasValue)
+ {
+ clusterMembershipOptions.NumVotesForDeathDeclaration = numVotesForDeathDeclaration.Value;
+ }
+
+ var testRig = CreateClusterHealthMonitorTestRig(clusterMembershipOptions);
+
+ var otherSilos = new[]
+ {
+ Entry(Silo("127.0.0.200:100@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:200@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:300@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:400@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:500@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:600@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:700@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:800@100"), SiloStatus.Active),
+ Entry(Silo("127.0.0.200:900@100"), SiloStatus.Active)
+ };
+
+ var joiningSilo = "127.0.0.200:111@100";
+ var createdSilo = "127.0.0.200:112@100";
+
+ // default MaxJoinAttemptTime is 5 minutes, setting it to 6 minutes ago will make sure they are flagged immediately
+ var staleCreatedOrJoiningSilos = new[]
+ {
+ Entry(Silo(joiningSilo), SiloStatus.Joining, DateTime.UtcNow.AddMinutes(-6)),
+ Entry(Silo(createdSilo), SiloStatus.Created, DateTime.UtcNow.AddMinutes(-6)),
+ };
+
+ otherSilos = [.. otherSilos, .. staleCreatedOrJoiningSilos];
+
+ var lastVersion = testRig.TestAccessor.ObservedVersion;
+
+ // Add the new silos
+ var table = await this.membershipTable.ReadAll();
+ foreach (var entry in otherSilos)
+ {
+ table = await this.membershipTable.ReadAll();
+ Assert.True(await this.membershipTable.InsertRow(entry, table.Version.Next()));
+ }
+
+ table = await this.membershipTable.ReadAll();
+ var joiningEntry = GetEntryFromTable(table, joiningSilo);
+ var createdEntry = GetEntryFromTable(table, createdSilo);
+
+ Assert.NotNull(joiningEntry);
+ Assert.NotNull(createdEntry);
+
+ Assert.Equal(expected: SiloStatus.Joining, actual: joiningEntry.Item1.Status);
+ Assert.Equal(expected: SiloStatus.Created, actual: createdEntry.Item1.Status);
+
+ // We are going to add numVotesForDeathDeclaration - 1 votes to the created or joining silos
+ var totalRequiredVotes = clusterMembershipOptions.NumVotesForDeathDeclaration;
+
+ var votesNeeded = totalRequiredVotes - 1;
+
+ // the joining and created silos should not be declared dead until the required number of votes.
+ while (votesNeeded > 0)
+ {
+ table = await this.membershipTable.ReadAll();
+ joiningEntry = GetEntryFromTable(table, joiningSilo);
+ joiningEntry.Item1.AddSuspector(otherSilos[0].SiloAddress, DateTime.UtcNow);
+ Assert.True(await this.membershipTable.UpdateRow(joiningEntry.Item1, joiningEntry.Item2, table.Version.Next()));
+
+ table = await this.membershipTable.ReadAll();
+ createdEntry = GetEntryFromTable(table, createdSilo);
+ createdEntry.Item1.AddSuspector(otherSilos[0].SiloAddress, DateTime.UtcNow);
+ Assert.True(await this.membershipTable.UpdateRow(createdEntry.Item1, createdEntry.Item2, table.Version.Next()));
+
+ votesNeeded--;
+ }
+
+ table = await this.membershipTable.ReadAll();
+ joiningEntry = GetEntryFromTable(table, joiningSilo);
+ createdEntry = GetEntryFromTable(table, createdSilo);
+
+ // Suspect time will be null if numVotesForDeathDeclaration == 1
+ if (totalRequiredVotes > 1 && evictWhenMaxJoinAttemptTimeExceeded)
+ {
+ Assert.Equal(totalRequiredVotes - 1, joiningEntry.Item1.SuspectTimes.Count);
+ Assert.Equal(totalRequiredVotes - 1, createdEntry.Item1.SuspectTimes.Count);
+ }
+
+ // now we start the lifecycle and let the local silo add the final vote.
+ await this.lifecycle.OnStart();
+
+ await testRig.Manager.Refresh();
+
+ await Until(() => testRig.TestAccessor.ObservedVersion > lastVersion);
+ lastVersion = testRig.TestAccessor.ObservedVersion;
+
+ table = await this.membershipTable.ReadAll();
+ joiningEntry = GetEntryFromTable(table, joiningSilo);
+ createdEntry = GetEntryFromTable(table, createdSilo);
+
+ var expectedVotes = totalRequiredVotes == 1
+ ? 2
+ : totalRequiredVotes;
+
+ expectedVotes = evictWhenMaxJoinAttemptTimeExceeded
+ ? totalRequiredVotes
+ : totalRequiredVotes - 1;
+
+ Assert.True(expectedVotes <= joiningEntry.Item1.SuspectTimes.Count);
+ Assert.True(expectedVotes <= createdEntry.Item1.SuspectTimes.Count);
+
+ Assert.Equal(expected: evictWhenMaxJoinAttemptTimeExceeded ? SiloStatus.Dead : SiloStatus.Joining, actual: joiningEntry.Item1.Status);
+ Assert.Equal(expected: evictWhenMaxJoinAttemptTimeExceeded ? SiloStatus.Dead : SiloStatus.Created, actual: createdEntry.Item1.Status);
+
+ await StopLifecycle();
+
+ static Tuple GetEntryFromTable(MembershipTableData table, string siloAddress)
+ {
+ return table.Members.FirstOrDefault(entry => entry.Item1.SiloAddress.ToParsableString() == siloAddress);
+ }
+ }
+
private static SiloAddress Silo(string value) => SiloAddress.FromParsableString(value);
- private static MembershipEntry Entry(SiloAddress address, SiloStatus status) => new MembershipEntry { SiloAddress = address, Status = status };
+ private static MembershipEntry Entry(SiloAddress address, SiloStatus status, DateTime startTime = default) => new MembershipEntry { SiloAddress = address, Status = status, StartTime = startTime };
private static async Task UntilEqual(T expected, Func getActual)
{
@@ -393,5 +518,66 @@ private async Task StopLifecycle(CancellationToken cancellation = default)
await stopped;
}
+
+ private class ClusterHealthMonitorTestRig(
+ MembershipTableManager manager,
+ IClusterMembershipService membershipService,
+ IOptionsMonitor optionsMonitor,
+ ClusterHealthMonitor.ITestAccessor testAccessor)
+ {
+ public readonly MembershipTableManager Manager = manager;
+ public readonly IClusterMembershipService MembershipService = membershipService;
+ public readonly IOptionsMonitor OptionsMonitor = optionsMonitor;
+ public readonly ClusterHealthMonitor.ITestAccessor TestAccessor = testAccessor;
+ }
+
+ private ClusterHealthMonitorTestRig CreateClusterHealthMonitorTestRig(ClusterMembershipOptions clusterMembershipOptions)
+ {
+ var manager = new MembershipTableManager(
+ localSiloDetails: this.localSiloDetails,
+ clusterMembershipOptions: Options.Create(clusterMembershipOptions),
+ membershipTable: membershipTable,
+ fatalErrorHandler: this.fatalErrorHandler,
+ gossiper: this.membershipGossiper,
+ log: this.loggerFactory.CreateLogger(),
+ timerFactory: new AsyncTimerFactory(this.loggerFactory),
+ this.lifecycle);
+
+ ((ILifecycleParticipant)manager).Participate(this.lifecycle);
+
+ var membershipService = Substitute.For();
+ membershipService.CurrentSnapshot.ReturnsForAnyArgs(info => manager.MembershipTableSnapshot.CreateClusterMembershipSnapshot());
+
+ var optionsMonitor = Substitute.For>();
+ optionsMonitor.CurrentValue.ReturnsForAnyArgs(clusterMembershipOptions);
+
+ var monitor = new ClusterHealthMonitor(
+ this.localSiloDetails,
+ manager,
+ this.loggerFactory.CreateLogger(),
+ optionsMonitor,
+ this.fatalErrorHandler,
+ null);
+
+ ((ILifecycleParticipant)monitor).Participate(this.lifecycle);
+
+ var testAccessor = (ClusterHealthMonitor.ITestAccessor)monitor;
+ testAccessor.CreateMonitor = s => new SiloHealthMonitor(
+ s,
+ testAccessor.OnProbeResult,
+ optionsMonitor,
+ this.loggerFactory,
+ this.prober,
+ this.timerFactory,
+ this.localSiloHealthMonitor,
+ membershipService,
+ this.localSiloDetails);
+
+ return new(
+ manager: manager,
+ membershipService: membershipService,
+ optionsMonitor: optionsMonitor,
+ testAccessor: testAccessor);
+ }
}
}
diff --git a/test/NonSilo.Tests/Membership/MembershipTableManagerTests.cs b/test/NonSilo.Tests/Membership/MembershipTableManagerTests.cs
index 59f4b10bdd..31a97fbc76 100644
--- a/test/NonSilo.Tests/Membership/MembershipTableManagerTests.cs
+++ b/test/NonSilo.Tests/Membership/MembershipTableManagerTests.cs
@@ -288,7 +288,7 @@ public async Task MembershipTableManager_Restarted()
[Fact]
public async Task MembershipTableManager_Superseded()
{
- // The table includes a sucessor to this silo.
+ // The table includes a successor to this silo.
var successor = Entry(Silo("127.0.0.1:100@200"), SiloStatus.Active);
var otherSilos = new[]