diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs index f9978c6bcbd..afaffaf22d7 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs @@ -7,6 +7,8 @@ using System; using System.Collections.Immutable; +using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit; using FluentAssertions; @@ -71,138 +73,138 @@ public ClusterDomainEventPublisherSpec() : base(Config) } [Fact] - public void ClusterDomainEventPublisher_must_publish_MemberJoined() + public async Task ClusterDomainEventPublisher_must_publish_MemberJoined() { _publisher.Tell(new InternalClusterAction.PublishChanges(state1)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberJoined(cJoining)); } [Fact] - public void ClusterDomainEventPublisher_must_publish_MemberUp() + public async Task ClusterDomainEventPublisher_must_publish_MemberUp() { _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp)); } [Fact] - public void ClusterDomainEventPublisher_must_publish_leader_changed() + public async Task ClusterDomainEventPublisher_must_publish_leader_changed() { _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); - _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address)); - _memberSubscriber.ExpectNoMsg(500.Milliseconds()); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address)); + await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds()); } [Fact] - public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed() + public async Task ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed() { _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp)); _publisher.Tell(new InternalClusterAction.PublishChanges(state6)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberLeft(aLeaving)); _publisher.Tell(new InternalClusterAction.PublishChanges(state7)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address)); - _memberSubscriber.ExpectNoMsg(500.Milliseconds()); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(aExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(cUp.Address)); + await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds()); // at the removed member a an empty gossip is the last thing _publisher.Tell(new InternalClusterAction.PublishChanges(_emptyMembershipState)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up)); - _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(null)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(null)); } [Fact] - public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader() + public async Task ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader() { _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); - _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(a51Up)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.LeaderChanged(a51Up.Address)); _publisher.Tell(new InternalClusterAction.PublishChanges(state5)); - _memberSubscriber.ExpectNoMsg(500.Milliseconds()); + await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds()); } [Fact] - public void ClusterDomainEventPublisher_must_publish_role_leader_changed() + public async Task ClusterDomainEventPublisher_must_publish_role_leader_changed() { var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged)))); - subscriber.ExpectMsg(); + await subscriber.ExpectMsgAsync(); _publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress))); - subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address)); + await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address)); _publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress))); - subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address)); + await subscriber.ExpectMsgAsync(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address)); } [Fact] - public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe() + public async Task ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subscribe() { var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IClusterDomainEvent)))); - subscriber.ExpectMsg(); + await subscriber.ExpectMsgAsync(); // but only to the new subscriber - _memberSubscriber.ExpectNoMsg(500.Milliseconds()); + await _memberSubscriber.ExpectNoMsgAsync(500.Milliseconds()); } [Fact] - public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe() + public async Task ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe() { var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.PublishChanges(state8)); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent)))); - subscriber.ReceiveN(4).Should().BeEquivalentTo( + (await subscriber.ReceiveNAsync(4).ToListAsync()).Should().BeEquivalentTo( new ClusterEvent.MemberUp(aUp), new ClusterEvent.MemberUp(cUp), new ClusterEvent.MemberUp(dUp), new ClusterEvent.MemberExited(bExiting)); - subscriber.ExpectMsg(new ClusterEvent.UnreachableMember(dUp)); - subscriber.ExpectNoMsg(500.Milliseconds()); + await subscriber.ExpectMsgAsync(new ClusterEvent.UnreachableMember(dUp)); + await subscriber.ExpectNoMsgAsync(500.Milliseconds()); } [Fact] - public void ClusterDomainEventPublisher_should_support_unsubscribe() + public async Task ClusterDomainEventPublisher_should_support_unsubscribe() { var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent)))); - subscriber.ExpectMsg(); + await subscriber.ExpectMsgAsync(); _publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent))); _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); - subscriber.ExpectNoMsg(500.Milliseconds()); + await subscriber.ExpectNoMsgAsync(500.Milliseconds()); // but memberSubscriber is still subscriber - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberExited(bExiting)); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberUp(cUp)); } [Fact] - public void ClusterDomainEventPublisher_must_publish_seen_changed() + public async Task ClusterDomainEventPublisher_must_publish_seen_changed() { var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged)))); - subscriber.ExpectMsg(); + await subscriber.ExpectMsgAsync(); _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); - subscriber.ExpectMsg(); - subscriber.ExpectNoMsg(500.Milliseconds()); + await subscriber.ExpectMsgAsync(); + await subscriber.ExpectNoMsgAsync(500.Milliseconds()); _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); - subscriber.ExpectMsg(); - subscriber.ExpectNoMsg(500.Milliseconds()); + await subscriber.ExpectMsgAsync(); + await subscriber.ExpectNoMsgAsync(500.Milliseconds()); } [Fact] - public void ClusterDomainEventPublisher_must_publish_removed_when_stopped() + public async Task ClusterDomainEventPublisher_must_publish_removed_when_stopped() { _publisher.Tell(PoisonPill.Instance); - _memberSubscriber.ExpectMsg(ClusterEvent.ClusterShuttingDown.Instance); - _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up)); + await _memberSubscriber.ExpectMsgAsync(ClusterEvent.ClusterShuttingDown.Instance); + await _memberSubscriber.ExpectMsgAsync(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up)); } }