-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Singleton class and settings based on current Akka Typed implementati…
…on (#6050) Co-authored-by: Aaron Stannard <[email protected]>
- Loading branch information
1 parent
207e7b8
commit 899c3a2
Showing
7 changed files
with
589 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
133 changes: 133 additions & 0 deletions
133
src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonApiSpec.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSingletonConfigSpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.Singleton; | ||
using Akka.Configuration; | ||
using Akka.TestKit; | ||
using Akka.TestKit.Configs; | ||
using Akka.TestKit.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Cluster.Tools.Tests.Singleton | ||
{ | ||
public class ClusterSingletonApiSpec : AkkaSpec | ||
{ | ||
#region Internal | ||
|
||
public sealed class Pong | ||
{ | ||
public static Pong Instance => new Pong(); | ||
private Pong() { } | ||
} | ||
|
||
public sealed class Ping | ||
{ | ||
public IActorRef RespondTo { get; } | ||
public Ping(IActorRef respondTo) => RespondTo = respondTo; | ||
} | ||
|
||
public sealed class Perish | ||
{ | ||
public static Perish Instance => new Perish(); | ||
private Perish() { } | ||
} | ||
|
||
public class PingPong : UntypedActor | ||
{ | ||
protected override void OnReceive(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case Ping ping: | ||
ping.RespondTo.Tell(Pong.Instance); | ||
break; | ||
case Perish _: | ||
Context.Stop(Self); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
#endregion | ||
|
||
private readonly Cluster _clusterNode1; | ||
private readonly Cluster _clusterNode2; | ||
private readonly ActorSystem _system2; | ||
|
||
public static Config GetConfig() => ConfigurationFactory.ParseString(@" | ||
akka.loglevel = DEBUG | ||
akka.actor.provider = ""cluster"" | ||
akka.cluster.roles = [""singleton""] | ||
akka.remote { | ||
dot-netty.tcp { | ||
hostname = ""127.0.0.1"" | ||
port = 0 | ||
} | ||
}").WithFallback(TestConfigs.DefaultConfig); | ||
|
||
public ClusterSingletonApiSpec(ITestOutputHelper testOutput) | ||
: base(GetConfig(), testOutput) | ||
{ | ||
_clusterNode1 = Cluster.Get(Sys); | ||
|
||
_system2 = ActorSystem.Create( | ||
Sys.Name, | ||
ConfigurationFactory.ParseString("akka.cluster.roles = [\"singleton\"]").WithFallback(Sys.Settings.Config)); | ||
|
||
_clusterNode2 = Cluster.Get(_system2); | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_singleton_must_be_accessible_from_two_nodes_in_a_cluster() | ||
{ | ||
var node1UpProbe = CreateTestProbe(Sys); | ||
var node2UpProbe = CreateTestProbe(Sys); | ||
|
||
_clusterNode1.Join(_clusterNode1.SelfAddress); | ||
node1UpProbe.AwaitAssert(() => _clusterNode1.SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); | ||
|
||
_clusterNode2.Join(_clusterNode2.SelfAddress); | ||
node2UpProbe.AwaitAssert(() => _clusterNode2.SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); | ||
|
||
var cs1 = ClusterSingleton.Get(Sys); | ||
var cs2 = ClusterSingleton.Get(_system2); | ||
|
||
var settings = ClusterSingletonSettings.Create(Sys).WithRole("singleton"); | ||
var node1ref = cs1.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)); | ||
var node2ref = cs2.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)); | ||
|
||
// subsequent spawning returns the same refs | ||
cs1.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)).ShouldBe(node1ref); | ||
cs2.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)).ShouldBe(node2ref); | ||
|
||
var node1PongProbe = CreateTestProbe(Sys); | ||
var node2PongProbe = CreateTestProbe(_system2); | ||
|
||
node1PongProbe.AwaitAssert(() => | ||
{ | ||
node1ref.Tell(new Ping(node1PongProbe.Ref)); | ||
node1PongProbe.ExpectMsg<Pong>(); | ||
}, TimeSpan.FromSeconds(3)); | ||
|
||
node2PongProbe.AwaitAssert(() => | ||
{ | ||
node2ref.Tell(new Ping(node2PongProbe.Ref)); | ||
node2PongProbe.ExpectMsg<Pong>(); | ||
}, TimeSpan.FromSeconds(3)); | ||
} | ||
|
||
protected override async Task AfterAllAsync() | ||
{ | ||
await base.AfterAllAsync(); | ||
await _system2.Terminate().AwaitWithTimeout(TimeSpan.FromSeconds(3)); | ||
} | ||
} | ||
} |
131 changes: 131 additions & 0 deletions
131
src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingleton.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSingletonManager.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
using Akka.Actor; | ||
using Akka.Annotations; | ||
using Akka.Util; | ||
|
||
namespace Akka.Cluster.Tools.Singleton | ||
{ | ||
/// <summary> | ||
/// This class is not intended for user extension other than for test purposes (e.g. stub implementation). | ||
/// More methods may be added in the future and that may break such implementations. | ||
/// </summary> | ||
[DoNotInherit] | ||
public class ClusterSingleton : IExtension | ||
{ | ||
private readonly ActorSystem _system; | ||
private readonly Lazy<Cluster> _cluster; | ||
private readonly ConcurrentDictionary<string, IActorRef> _proxies = new ConcurrentDictionary<string, IActorRef>(); | ||
|
||
public static ClusterSingleton Get(ActorSystem system) => | ||
system.WithExtension<ClusterSingleton, ClusterSingletonProvider>(); | ||
|
||
public ClusterSingleton(ExtendedActorSystem system) | ||
{ | ||
_system = system; | ||
_cluster = new Lazy<Cluster>(() => Cluster.Get(system)); | ||
} | ||
|
||
/// <summary> | ||
/// Start if needed and provide a proxy to a named singleton. | ||
/// | ||
/// <para>If there already is a manager running for the given `singletonName` on this node, no additional manager is started.</para> | ||
/// <para>If there already is a proxy running for the given `singletonName` on this node, an <see cref="IActorRef"/> to that is returned.</para> | ||
/// </summary> | ||
/// <returns>A proxy actor that can be used to communicate with the singleton in the cluster</returns> | ||
public IActorRef Init(SingletonActor singleton) | ||
{ | ||
var settings = singleton.Settings.GetOrElse(ClusterSingletonSettings.Create(_system)); | ||
if (settings.ShouldRunManager(_cluster.Value)) | ||
{ | ||
var managerName = ManagerNameFor(singleton.Name); | ||
try | ||
{ | ||
_system.ActorOf(ClusterSingletonManager.Props( | ||
singletonProps: singleton.Props, | ||
terminationMessage: singleton.StopMessage.GetOrElse(PoisonPill.Instance), | ||
settings: settings.ToManagerSettings(singleton.Name)), | ||
managerName); | ||
} | ||
catch (InvalidActorNameException ex) when (ex.Message.EndsWith("is not unique!")) | ||
{ | ||
// This is fine. We just wanted to make sure it is running and it already is | ||
} | ||
} | ||
|
||
return GetProxy(singleton.Name, settings); | ||
} | ||
|
||
private IActorRef GetProxy(string name, ClusterSingletonSettings settings) | ||
{ | ||
IActorRef ProxyCreator() | ||
{ | ||
var proxyName = $"singletonProxy{name}"; | ||
return _system.ActorOf(ClusterSingletonProxy.Props( | ||
singletonManagerPath: $"/user/{ManagerNameFor(name)}", | ||
settings: settings.ToProxySettings(name)), | ||
proxyName); | ||
} | ||
|
||
return _proxies.GetOrAdd(name, _ => ProxyCreator()); | ||
} | ||
|
||
|
||
private string ManagerNameFor(string singletonName) => $"singletonManager{singletonName}"; | ||
} | ||
|
||
public class ClusterSingletonProvider : ExtensionIdProvider<ClusterSingleton> | ||
{ | ||
public override ClusterSingleton CreateExtension(ExtendedActorSystem system) => new ClusterSingleton(system); | ||
} | ||
|
||
public class SingletonActor | ||
{ | ||
public string Name { get; } | ||
|
||
public Props Props { get; } | ||
|
||
public Option<object> StopMessage { get; } | ||
|
||
public Option<ClusterSingletonSettings> Settings { get; } | ||
|
||
public static SingletonActor Create(Props props, string name) => | ||
new SingletonActor(name, props, Option<object>.None, Option<ClusterSingletonSettings>.None); | ||
|
||
private SingletonActor(string name, Props props, Option<object> stopMessage, Option<ClusterSingletonSettings> settings) | ||
{ | ||
Name = name; | ||
Props = props; | ||
StopMessage = stopMessage; | ||
Settings = settings; | ||
} | ||
|
||
/// <summary> | ||
/// <see cref="Props"/> of the singleton actor, such as dispatcher settings. | ||
/// </summary> | ||
public SingletonActor WithProps(Props props) => Copy(props: props); | ||
|
||
/// <summary> | ||
/// Message sent to the singleton to tell it to stop, e.g. when being migrated. | ||
/// If this is not defined, a <see cref="PoisonPill"/> will be used instead. | ||
/// It can be useful to define a custom stop message if the singleton needs to | ||
/// perform some asynchronous cleanup or interactions before stopping. | ||
/// </summary> | ||
public SingletonActor WithStopMessage(object stopMessage) => Copy(stopMessage: stopMessage); | ||
|
||
/// <summary> | ||
/// Additional settings, typically loaded from configuration. | ||
/// </summary> | ||
public SingletonActor WithSettings(ClusterSingletonSettings settings) => Copy(settings: settings); | ||
|
||
private SingletonActor Copy(string name = null, Props props = null, Option<object> stopMessage = default, Option<ClusterSingletonSettings> settings = default) => | ||
new SingletonActor(name ?? Name, props ?? Props, stopMessage.HasValue ? stopMessage : StopMessage, settings.HasValue ? settings : Settings); | ||
} | ||
} |
Oops, something went wrong.