Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Dec 17, 2021
1 parent 3aa8fd6 commit a9ea664
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 54 deletions.
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster.Tests.MultiNode/TransitionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ private void AwaitMemberStatus(Address address, MemberStatus status)

private void LeaderActions()
{
Cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance);
Cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance);
}

private void ReapUnreachable()
{
Cluster.TellCoreSafe(InternalClusterAction.ReapUnreachableTick.Instance);
Cluster.ClusterCore.Tell(InternalClusterAction.ReapUnreachableTick.Instance);
}

private int _gossipBarrierCounter = 0;
Expand All @@ -148,7 +148,7 @@ private void GossipTo(RoleName fromRole, RoleName toRole)
{
EnterBarrier("before-gossip-" + _gossipBarrierCounter);
// send gossip
Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(toRole)));
Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(toRole)));
// gossip chat will synchronize the views
AwaitCondition(() => ImmutableHashSet.Create(fromRole, toRole).Except(SeenLatestGossip()).IsEmpty);
EnterBarrier("after-gossip-" + _gossipBarrierCounter);
Expand Down Expand Up @@ -286,7 +286,7 @@ private void A_Cluster_must_perform_correct_transitions_when_third_joins_second(
RunOn(() =>
{
// send gossip
Cluster.TellCoreSafe(new InternalClusterAction.SendGossipTo(GetAddress(other2)));
Cluster.ClusterCore.Tell(new InternalClusterAction.SendGossipTo(GetAddress(other2)));
}, other1);

RunOn(() =>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ClusterSpec(ITestOutputHelper output)

internal void LeaderActions()
{
_cluster.TellCoreSafe(InternalClusterAction.LeaderActionsTick.Instance);
_cluster.ClusterCore.Tell(InternalClusterAction.LeaderActionsTick.Instance);
}

[Fact]
Expand Down
79 changes: 30 additions & 49 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ public Cluster(ActorSystemImpl system)

LogInfo("Starting up...");

var clusterCoreTaskSource = new TaskCompletionSource<IActorRef>();
_clusterCoreTask = clusterCoreTaskSource.Task;

FailureDetector = new DefaultFailureDetectorRegistry<Address>(() => FailureDetectorLoader.Load(Settings.FailureDetectorImplementationClass, Settings.FailureDetectorConfig,
system));

Expand All @@ -143,26 +140,27 @@ public Cluster(ActorSystemImpl system)
_readView = new ClusterReadView(this);

// force the underlying system to start
_ = Task.Run(async () =>
{
try
{
_clusterCore = await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), System.Settings.CreationTimeout).ConfigureAwait(false);
clusterCoreTaskSource.SetResult(_clusterCore);
_clusterCore = GetClusterCoreRef().Result;

system.RegisterOnTermination(Shutdown);
LogInfo("Started up successfully");
}
catch (Exception ex)
{
_log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.");
Shutdown();
System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it.
system.RegisterOnTermination(Shutdown);

_clusterCore = System.DeadLetters;
clusterCoreTaskSource.SetResult(_clusterCore);
}
});
LogInfo("Started up successfully");
}

private async Task<IActorRef> GetClusterCoreRef()
{
var timeout = System.Settings.CreationTimeout;
try
{
return await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), timeout).ConfigureAwait(false);
}
catch (Exception ex)
{
_log.Error(ex, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.");
Shutdown();
System.DeadLetters.Tell(ex); //don't re-throw the error. Just log it.
return System.DeadLetters;
}
}

/// <summary>
Expand Down Expand Up @@ -199,7 +197,7 @@ public void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStat
if (!to.All(t => typeof(ClusterEvent.IClusterDomainEvent).IsAssignableFrom(t)))
throw new ArgumentException($"Subscribe to `IClusterDomainEvent` or subclasses, was [{string.Join(", ", to.Select(c => c.Name))}]", nameof(to));

TellCoreSafe(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to)));
ClusterCore.Tell(new InternalClusterAction.Subscribe(subscriber, initialStateMode, ImmutableHashSet.Create(to)));
}

/// <summary>
Expand All @@ -218,7 +216,7 @@ public void Unsubscribe(IActorRef subscriber)
/// <param name="to">The event type that the actor no longer receives.</param>
public void Unsubscribe(IActorRef subscriber, Type to)
{
TellCoreSafe(new InternalClusterAction.Unsubscribe(subscriber, to));
ClusterCore.Tell(new InternalClusterAction.Unsubscribe(subscriber, to));
}

/// <summary>
Expand All @@ -229,7 +227,7 @@ public void Unsubscribe(IActorRef subscriber, Type to)
/// <param name="receiver">The actor that receives the current cluster state.</param>
public void SendCurrentClusterState(IActorRef receiver)
{
TellCoreSafe(new InternalClusterAction.SendCurrentClusterState(receiver));
ClusterCore.Tell(new InternalClusterAction.SendCurrentClusterState(receiver));
}

/// <summary>
Expand All @@ -243,7 +241,7 @@ public void SendCurrentClusterState(IActorRef receiver)
/// <param name="address">The address of the node we want to join.</param>
public void Join(Address address)
{
TellCoreSafe(new ClusterUserAction.JoinTo(FillLocal(address)));
ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -301,7 +299,8 @@ private Address FillLocal(Address address)
/// <param name="seedNodes">TBD</param>
public void JoinSeedNodes(IEnumerable<Address> seedNodes)
{
TellCoreSafe(new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList()));
ClusterCore.Tell(
new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList()));
}

/// <summary>
Expand Down Expand Up @@ -354,7 +353,7 @@ public void Leave(Address address)
LeaveSelf();
}
else
TellCoreSafe(new ClusterUserAction.Leave(FillLocal(address)));
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -401,7 +400,7 @@ private Task LeaveSelf()
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null)));

// Send leave message
TellCoreSafe(new ClusterUserAction.Leave(SelfAddress));
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress));

return tcs.Task;
}
Expand All @@ -417,7 +416,7 @@ private Task LeaveSelf()
/// <param name="address">The address of the node we're going to mark as <see cref="MemberStatus.Down"/></param>
public void Down(Address address)
{
TellCoreSafe(new ClusterUserAction.Down(FillLocal(address)));
ClusterCore.Tell(new ClusterUserAction.Down(FillLocal(address)));
}

/// <summary>
Expand Down Expand Up @@ -564,40 +563,22 @@ internal void Shutdown()

private readonly IActorRef _clusterDaemons;
private IActorRef _clusterCore;
private Task<IActorRef> _clusterCoreTask;

/// <summary>
/// TBD
/// </summary>
[Obsolete("use TellCoreSafe()")]
internal IActorRef ClusterCore
{
get
{
if (_clusterCore is null)
if (_clusterCore == null)
{
if (_clusterCoreTask is null)
throw new InvalidOperationException();

_clusterCore = _clusterCoreTask.Result;
_clusterCore = GetClusterCoreRef().Result;
}
return _clusterCore;
}
}

/// <summary>
/// INTERNAL API.
///
/// We have to wait for cluster core to startup before we can use it
/// </summary>
internal void TellCoreSafe(object message)
{
if (_clusterCore is null)
_ = _clusterCoreTask.ContinueWith((t, m) => t.Result.Tell(m), message);
else
_clusterCore.Tell(message);
}

/// <summary>
/// INTERNAL API.
///
Expand Down

0 comments on commit a9ea664

Please sign in to comment.