Skip to content

Commit

Permalink
Akka.Cluster.Tools.Singleton / Akka.Cluster.Sharding: fix duplicate s…
Browse files Browse the repository at this point in the history
…hards caused by incorrect `ClusterSingletonManager` `HandOver` (#7297)

* close #6973 - eliminate duplicate shards

Eliminates the source of #6793, which was caused by using the incorrect ordering methodology when it came to determining which `ClusterSingletonManager` to hand-over to during member state transitions.

close #6973
close #7196

* fixed build warnings
  • Loading branch information
Aaronontheweb authored Jul 24, 2024
1 parent f4501e8 commit f2e81c5
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public TestException(string message, Exception innerEx)
: base(message, innerEx)
{
}

protected TestException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}

private readonly Cluster _cluster;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
.WithDeploy(Deploy.Local);
}

private readonly MemberAgeOrdering _memberAgeComparer;
private readonly IComparer<Member> _memberAgeComparer;
private readonly ClusterSingletonProxySettings _settings;
private readonly Cluster _cluster = Cluster.Get(Context.System);
private readonly Queue<KeyValuePair<object, IActorRef>> _buffer = new(); // queue seems to fit better
Expand All @@ -99,9 +99,7 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
_singletonPath = (singletonManagerPath + "/" + settings.SingletonName).Split('/');
_identityId = CreateIdentifyId(_identityCounter);

_memberAgeComparer = settings.ConsiderAppVersion
? MemberAgeOrdering.DescendingWithAppVersion
: MemberAgeOrdering.Descending;
_memberAgeComparer = Member.AgeOrdering;
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);

Receive<ClusterEvent.CurrentClusterState>(s => HandleInitial(s));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public OldestChanged(UniqueAddress oldest)

#endregion

private readonly MemberAgeOrdering _memberAgeComparer;
private readonly IComparer<Member> _memberAgeComparer;
private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System);

/// <summary>
Expand All @@ -103,9 +103,7 @@ public OldestChanged(UniqueAddress oldest)
public OldestChangedBuffer(string role, bool considerAppVersion)
{
_role = role;
_memberAgeComparer = considerAppVersion
? MemberAgeOrdering.DescendingWithAppVersion
: MemberAgeOrdering.Descending;
_memberAgeComparer = Member.AgeOrdering;
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);

SetupCoordinatedShutdown();
Expand Down Expand Up @@ -147,8 +145,7 @@ private void TrackChanges(Action block)
var before = _membersByAge.FirstOrDefault();
block();
var after = _membersByAge.FirstOrDefault();

// todo: fix neq comparison

if (!Equals(before, after))
_changes = _changes.Enqueue(new OldestChanged(after?.UniqueAddress));
}
Expand Down Expand Up @@ -221,21 +218,33 @@ protected override void PostStop()
/// <inheritdoc cref="UntypedActor.OnReceive"/>
protected override void OnReceive(object message)
{
if (message is ClusterEvent.CurrentClusterState state) HandleInitial(state);
else if (message is ClusterEvent.MemberUp up) Add(up.Member);
else if (message is ClusterEvent.MemberRemoved removed) Remove(removed.Member);
else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress)
Remove(exited.Member);
else if (message is SelfExiting)
switch (message)
{
Remove(_cluster.ReadView.Self);
Sender.Tell(Done.Instance); // reply to ask
}
else if (message is GetNext && _changes.IsEmpty) Context.BecomeStacked(OnDeliverNext);
else if (message is GetNext) SendFirstChange();
else
{
Unhandled(message);
case ClusterEvent.CurrentClusterState state:
HandleInitial(state);
break;
case ClusterEvent.MemberUp up:
Add(up.Member);
break;
case ClusterEvent.MemberRemoved removed:
Remove(removed.Member);
break;
case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress:
Remove(exited.Member);
break;
case SelfExiting:
Remove(_cluster.ReadView.Self);
Sender.Tell(Done.Instance); // reply to ask
break;
case GetNext when _changes.IsEmpty:
Context.BecomeStacked(OnDeliverNext);
break;
case GetNext:
SendFirstChange();
break;
default:
Unhandled(message);
break;
}
}

Expand Down

0 comments on commit f2e81c5

Please sign in to comment.