Skip to content

Commit

Permalink
Cluster Domain Even Publisher (#5939)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba authored May 10, 2022
1 parent 37a2262 commit ebd694b
Showing 1 changed file with 55 additions and 53 deletions.
108 changes: 55 additions & 53 deletions src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using FluentAssertions;
Expand Down Expand Up @@ -71,138 +73,138 @@ public ClusterDomainEventPublisherSpec() : base(Config)
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberJoined()
public async Task ClusterDomainEventPublisher_must_publish_MemberJoined()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state1));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberJoined(cJoining));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberUp()
public async Task ClusterDomainEventPublisher_must_publish_MemberUp()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed()
public async Task ClusterDomainEventPublisher_must_publish_leader_changed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address));
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
public async Task ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
_publisher.Tell(new InternalClusterAction.PublishChanges(state6));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberLeft(aLeaving));
_publisher.Tell(new InternalClusterAction.PublishChanges(state7));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(aExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(cUp.Address));
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
// at the removed member a an empty gossip is the last thing
_publisher.Tell(new InternalClusterAction.PublishChanges(_emptyMembershipState));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(null));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(null));
}

[Fact]
public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
public async Task ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address));

_publisher.Tell(new InternalClusterAction.PublishChanges(state5));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_role_leader_changed()
public async Task ClusterDomainEventPublisher_must_publish_role_leader_changed()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
_publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
}

[Fact]
public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe()
public async Task ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IClusterDomainEvent))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
// but only to the new subscriber
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
public async Task ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.PublishChanges(state8));
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent))));

subscriber.ReceiveN(4).Should().BeEquivalentTo(
(await subscriber.ReceiveNAsync(4).ToListAsync()).Should().BeEquivalentTo(
new ClusterEvent.MemberUp(aUp),
new ClusterEvent.MemberUp(cUp),
new ClusterEvent.MemberUp(dUp),
new ClusterEvent.MemberExited(bExiting));

subscriber.ExpectMsg(new ClusterEvent.UnreachableMember(dUp));
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync(new ClusterEvent.UnreachableMember(dUp));
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_should_support_unsubscribe()
public async Task ClusterDomainEventPublisher_should_support_unsubscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent)));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
// but memberSubscriber is still subscriber
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting));
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_seen_changed()
public async Task ClusterDomainEventPublisher_must_publish_seen_changed()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
await subscriber.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync<ClusterEvent.SeenChanged>();
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
await subscriber.ExpectMsgAsync<ClusterEvent.SeenChanged>();
await subscriber.ExpectNoMsgAsync(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_removed_when_stopped()
public async Task ClusterDomainEventPublisher_must_publish_removed_when_stopped()
{
_publisher.Tell(PoisonPill.Instance);
_memberSubscriber.ExpectMsg(ClusterEvent.ClusterShuttingDown.Instance);
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up));
await _memberSubscriber.ExpectMsgAsync(ClusterEvent.ClusterShuttingDown.Instance);
await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up));
}

}
Expand Down

0 comments on commit ebd694b

Please sign in to comment.