Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteActorRefProvider address paring, caching and resolving improvements #5273

Merged
merged 41 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f25de15
refactor remote-actorref-provider and add tests for cache entries
Zetanova Sep 7, 2021
3fc0168
replace address-cache with actorpath-cache
Zetanova Sep 7, 2021
9a67f1c
refactor resolve with local address
Zetanova Sep 7, 2021
746f76b
refactor and cleanup
Zetanova Sep 7, 2021
bff01bb
remove volatile from fields
Zetanova Sep 7, 2021
d0f232e
Merge branch 'dev' into perf-remote-actorref-provider
Zetanova Sep 7, 2021
3dc67a5
merge upstream
Zetanova Sep 8, 2021
b458ae4
Merge branch 'perf-remote-actorref-provider' of https://github.com/Ze…
Zetanova Sep 8, 2021
2a55502
remove double equals
Zetanova Sep 8, 2021
14e88e2
cleanup
Zetanova Sep 9, 2021
762ec30
refactor to base
Zetanova Sep 9, 2021
aea2166
optimize elements list
Zetanova Sep 9, 2021
a36170f
improve actor path join
Zetanova Sep 10, 2021
aa211c7
improve actor path equals and compare
Zetanova Sep 10, 2021
1842505
cleanup
Zetanova Sep 10, 2021
f388cef
protect stack and use moveto of arraybuilder
Zetanova Sep 11, 2021
baaeea2
update api spec
Zetanova Sep 11, 2021
f86dc4b
test for jumbo actor path name support
Zetanova Sep 11, 2021
4427087
small refactors
Zetanova Sep 12, 2021
93f5d9c
add ActorPath.ParentOf(depth)
Zetanova Sep 12, 2021
d74deaa
dont copy actorpath
Zetanova Sep 12, 2021
a7a525e
use actorpath-cache and remove cache entry test
Zetanova Sep 12, 2021
d3c33e8
refactor fill array
Zetanova Sep 12, 2021
4512aec
prepair actor path cache for better deduplication
Zetanova Sep 12, 2021
ad3ca76
update api
Zetanova Sep 12, 2021
026a6fc
cache root actor path
Zetanova Sep 15, 2021
b29e242
update api
Zetanova Sep 15, 2021
9d9a2d7
remove obsolete code
Zetanova Sep 15, 2021
29eaff2
cleanup code
Zetanova Sep 15, 2021
f33929e
Merge branch 'dev' into perf-remote-actorref-provider
Aaronontheweb Sep 15, 2021
1051ed3
Merge remote-tracking branch 'upstream/dev' into perf-remote-actorref…
Zetanova Sep 15, 2021
2564415
Merge branch 'perf-remote-actorref-provider' of https://github.com/Ze…
Zetanova Sep 15, 2021
d3a0ae0
removed commented cache tests
Zetanova Sep 15, 2021
217485c
refactor span to string bulder
Zetanova Sep 15, 2021
409485f
use internal fields and ref equals
Zetanova Sep 17, 2021
9eba407
add rebase path test
Zetanova Sep 17, 2021
cc30aa1
fix possible NRE
Zetanova Sep 18, 2021
73d26ff
extend and test address parsing
Zetanova Sep 18, 2021
c13ad50
update api
Zetanova Sep 18, 2021
793b8b5
Merge branch 'dev' into perf-remote-actorref-provider
Zetanova Sep 29, 2021
8dd6e0c
Merge branch 'dev' into perf-remote-actorref-provider
Aaronontheweb Oct 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Xunit.Abstractions;
using Nito.AsyncEx;
using ThreadLocalRandom = Akka.Util.ThreadLocalRandom;
using Akka.Remote.Serialization;

namespace Akka.Remote.Tests
{
Expand Down Expand Up @@ -177,6 +178,34 @@ public async Task Remoting_must_support_Ask()
Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);
}

[Fact]
public async Task Remoting_should_not_cache_ref_of_local_ask()
{
var localActorRefResolveCache = ActorRefResolveThreadLocalCache.For(Sys);
var localActorPathCache = ActorPathThreadLocalCache.For(Sys);

var (msg, actorRef) = await _here.Ask<(string, IActorRef)>("ping", DefaultTimeout);
Assert.Equal("pong", msg);
Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);

Assert.Equal(0, localActorRefResolveCache.All.Sum(n => n.Stats.Entries));
Assert.Equal(2, localActorPathCache.All.Sum(n => n.Stats.Entries));
}

[Fact]
public async Task Remoting_should_not_cache_ref_of_remote_ask()
{
var remoteActorRefResolveCache = ActorRefResolveThreadLocalCache.For(_remoteSystem);
var remoteActorPathCache = ActorPathThreadLocalCache.For(_remoteSystem);

var (msg, actorRef) = await _here.Ask<(string, IActorRef)>("ping", DefaultTimeout);
Assert.Equal("pong", msg);
Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);

Assert.Equal(0, remoteActorRefResolveCache.All.Sum(n => n.Stats.Entries));
Assert.Equal(2, remoteActorPathCache.All.Sum(n => n.Stats.Entries)); //should be 1
}

[Fact(Skip = "Racy")]
public async Task Ask_does_not_deadlock()
{
Expand Down
97 changes: 48 additions & 49 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public interface IRemoteActorRefProvider : IActorRefProvider
/// <see cref="IActorRefProvider.ResolveActorRef(string)"/> method.
/// </summary>
/// <param name="path">The path of the actor we intend to resolve.</param>
/// <returns>An <see cref="IActorRef"/> if a match was found. Otherwise nobody.</returns>
/// <returns>An <see cref="IActorRef"/> if a match was found. Otherwise deadletters.</returns>
IActorRef InternalResolveActorRef(string path);

/// <summary>
Expand Down Expand Up @@ -128,7 +128,7 @@ public RemoteActorRefProvider(string systemName, Settings settings, EventStream
}

private readonly LocalActorRefProvider _local;
private volatile Internals _internals;
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
private Internals _internals;
private ActorSystemImpl _system;

private Internals RemoteInternals
Expand Down Expand Up @@ -235,34 +235,34 @@ public void UnregisterTempActor(ActorPath path)
_local.UnregisterTempActor(path);
}

private volatile IActorRef _remotingTerminator;
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
private volatile IActorRef _remoteWatcher;
private IActorRef _remotingTerminator;
private IActorRef _remoteWatcher;

private volatile ActorRefResolveThreadLocalCache _actorRefResolveThreadLocalCache;
private volatile ActorPathThreadLocalCache _actorPathThreadLocalCache;
private ActorRefResolveThreadLocalCache _actorRefResolveThreadLocalCache;
private ActorPathThreadLocalCache _actorPathThreadLocalCache;

/// <summary>
/// The remote death watcher.
/// </summary>
public IActorRef RemoteWatcher => _remoteWatcher;
private volatile IActorRef _remoteDeploymentWatcher;
private IActorRef _remoteDeploymentWatcher;

/// <inheritdoc/>
public virtual void Init(ActorSystemImpl system)
{
_system = system;

_local.Init(system);

_actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache.For(system);
_actorPathThreadLocalCache = ActorPathThreadLocalCache.For(system);

_local.Init(system);

_remotingTerminator =
_system.SystemActorOf(
RemoteSettings.ConfigureDispatcher(Props.Create(() => new RemotingTerminator(_local.SystemGuardian))),
"remoting-terminator");

_internals = CreateInternals();
_internals = CreateInternals();

_remotingTerminator.Tell(RemoteInternals);

Expand Down Expand Up @@ -433,9 +433,10 @@ public Deploy LookUpRemotes(IEnumerable<string> p)
return Deploy.None;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool HasAddress(Address address)
{
return address.Equals(_local.RootPath.Address) || address.Equals(RootPath.Address) || Transport.Addresses.Contains(address);
return address.Equals(RootPath.Address) || Transport.Addresses.Contains(address);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand All @@ -458,21 +459,6 @@ private IInternalActorRef LocalActorOf(ActorSystemImpl system, Props props, IInt
return _local.ActorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryParseCachedPath(string actorPath, out ActorPath path)
{
if (_actorPathThreadLocalCache != null)
{
path = _actorPathThreadLocalCache.Cache.GetOrCompute(actorPath);
return path != null;
}
else // cache not initialized yet
{
return ActorPath.TryParse(actorPath, out path);
}
}


/// <summary>
/// INTERNAL API.
///
Expand All @@ -483,20 +469,31 @@ private bool TryParseCachedPath(string actorPath, out ActorPath path)
/// <returns>TBD</returns>
public IInternalActorRef ResolveActorRefWithLocalAddress(string path, Address localAddress)
{
if (TryParseCachedPath(path, out var actorPath))
ActorPath actorPath;
if (_actorPathThreadLocalCache != null)
{
//the actor's local address was already included in the ActorPath
if (HasAddress(actorPath.Address))
{
if (actorPath is RootActorPath)
return RootGuardian;
return (IInternalActorRef)ResolveActorRef(path); // so we can use caching
}
actorPath = _actorPathThreadLocalCache.Cache.GetOrCompute(path);
}
else // cache not initialized yet
{
ActorPath.TryParse(path, out actorPath);
}

return CreateRemoteRef(new RootActorPath(actorPath.Address) / actorPath.ElementsWithUid, localAddress);
if (path is null)
{
_log.Debug("resolve of unknown path [{0}] failed", path);
return InternalDeadLetters;
}
_log.Debug("resolve of unknown path [{0}] failed", path);
return InternalDeadLetters;

if (!HasAddress(actorPath.Address))
return CreateRemoteRef(new RootActorPath(actorPath.Address) / actorPath.ElementsWithUid, localAddress);

//the actor's local address was already included in the ActorPath

if (actorPath is RootActorPath)
return RootGuardian;

return (IInternalActorRef)ResolveActorRef(path); // so we can use caching
}


Expand Down Expand Up @@ -539,7 +536,8 @@ public IActorRef ResolveActorRef(string path)
// if the value is not cached
if (_actorRefResolveThreadLocalCache == null)
{
return InternalResolveActorRef(path); // cache not initialized yet
// cache not initialized yet, should never happen
return InternalResolveActorRef(path);
}
return _actorRefResolveThreadLocalCache.Cache.GetOrCompute(path);
}
Expand Down Expand Up @@ -592,19 +590,20 @@ public IActorRef ResolveActorRef(ActorPath actorPath)
/// <returns>The remote Address, if applicable. If not applicable <c>null</c> may be returned.</returns>
public Address GetExternalAddressFor(Address address)
{
if (HasAddress(address)) { return _local.RootPath.Address; }
if (!string.IsNullOrEmpty(address.Host) && address.Port.HasValue)
if (HasAddress(address))
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
return _local.RootPath.Address;

if (string.IsNullOrEmpty(address.Host) || !address.Port.HasValue)
return null;

try
{
try
{
return Transport.LocalAddressForRemote(address);
}
catch
{
return null;
}
return Transport.LocalAddressForRemote(address);
}
catch
{
return null;
}
return null;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/RemoteSystemDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal interface IDaemonMsg { }
/// <summary>
/// INTERNAL API
/// </summary>
internal class DaemonMsgCreate : IDaemonMsg
internal sealed class DaemonMsgCreate : IDaemonMsg
{
/// <summary>
/// Initializes a new instance of the <see cref="DaemonMsgCreate" /> class.
Expand Down Expand Up @@ -77,7 +77,7 @@ public DaemonMsgCreate(Props props, Deploy deploy, string path, IActorRef superv
///
/// It acts as the brain of the remote that responds to system remote messages and executes actions accordingly.
/// </summary>
internal class RemoteSystemDaemon : VirtualPathContainer
internal sealed class RemoteSystemDaemon : VirtualPathContainer
{
private readonly ActorSystemImpl _system;
private readonly Switch _terminating = new Switch(false);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Remoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ internal interface IPriorityMessage { }
/// <summary>
/// INTERNAL API
/// </summary>
internal class Remoting : RemoteTransport
internal sealed class Remoting : RemoteTransport
{
private readonly ILoggingAdapter _log;
private volatile IDictionary<string, HashSet<ProtocolTransportAddressPair>> _transportMapping;
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka.Remote/Serialization/ActorPathCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using Akka.Actor;
using System.Threading;
using System.Collections.Generic;

namespace Akka.Remote.Serialization
{
Expand All @@ -16,10 +17,12 @@ namespace Akka.Remote.Serialization
/// </summary>
internal sealed class ActorPathThreadLocalCache : ExtensionIdProvider<ActorPathThreadLocalCache>, IExtension
{
private readonly ThreadLocal<ActorPathCache> _current = new ThreadLocal<ActorPathCache>(() => new ActorPathCache());
private readonly ThreadLocal<ActorPathCache> _current = new ThreadLocal<ActorPathCache>(() => new ActorPathCache(), true);

public ActorPathCache Cache => _current.Value;

internal IList<ActorPathCache> All => _current.Values;

public override ActorPathThreadLocalCache CreateExtension(ExtendedActorSystem system)
{
return new ActorPathThreadLocalCache();
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka.Remote/Serialization/ActorRefResolveCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using Akka.Actor;
using Akka.Util.Internal;
Expand All @@ -23,7 +24,7 @@ public ActorRefResolveThreadLocalCache() { }
public ActorRefResolveThreadLocalCache(IRemoteActorRefProvider provider)
{
_provider = provider;
_current = new ThreadLocal<ActorRefResolveCache>(() => new ActorRefResolveCache(_provider));
_current = new ThreadLocal<ActorRefResolveCache>(() => new ActorRefResolveCache(_provider), true);
}

public override ActorRefResolveThreadLocalCache CreateExtension(ExtendedActorSystem system)
Expand All @@ -35,6 +36,8 @@ public override ActorRefResolveThreadLocalCache CreateExtension(ExtendedActorSys

public ActorRefResolveCache Cache => _current.Value;

internal IList<ActorRefResolveCache> All => _current.Values;

public static ActorRefResolveThreadLocalCache For(ActorSystem system)
{
return system.WithExtension<ActorRefResolveThreadLocalCache, ActorRefResolveThreadLocalCache>();
Expand Down
20 changes: 7 additions & 13 deletions src/core/Akka.Remote/Transport/AkkaPduCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ public AckAndMessage(Ack ackOption, Message messageOption)
internal abstract class AkkaPduCodec
{
protected readonly ActorSystem System;
protected readonly AddressThreadLocalCache AddressCache;
protected readonly AddressThreadLocalCache ActorPathCache;

protected AkkaPduCodec(ActorSystem system)
{
System = system;
AddressCache = AddressThreadLocalCache.For(system);
ActorPathCache = AddressThreadLocalCache.For(system);
}

/// <summary>
Expand Down Expand Up @@ -426,22 +426,15 @@ public override AckAndMessage DecodeMessage(ByteString raw, IRemoteActorRefProvi
if (envelopeContainer != null)
{
var recipient = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Recipient.Path, localAddress);
Address recipientAddress;
if (AddressCache != null)
{
recipientAddress = AddressCache.Cache.GetOrCompute(envelopeContainer.Recipient.Path);
}
else
{
ActorPath.TryParseAddress(envelopeContainer.Recipient.Path, out recipientAddress);
}

//todo get parsed address from provider
var recipientAddress = ActorPathCache.Cache.GetOrCompute(envelopeContainer.Recipient.Path);

var serializedMessage = envelopeContainer.Message;
IActorRef senderOption = null;
if (envelopeContainer.Sender != null)
{
senderOption = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Sender.Path, localAddress);
}

SeqNo seqOption = null;
if (envelopeContainer.Seq != SeqUndefined)
{
Expand All @@ -450,6 +443,7 @@ public override AckAndMessage DecodeMessage(ByteString raw, IRemoteActorRefProvi
seqOption = new SeqNo((long)envelopeContainer.Seq); //proto takes a ulong
}
}

messageOption = new Message(recipient, recipientAddress, serializedMessage, senderOption, seqOption);
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/core/Akka.Tests/Actor/ActorPathSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public void Supports_parsing_remote_FQDN_paths()
[Fact]
public void Return_false_upon_malformed_path()
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like mostly formatting changes - LGTM.

ActorPath ignored;
ActorPath.TryParse("", out ignored).ShouldBe(false);
ActorPath.TryParse("://hallo", out ignored).ShouldBe(false);
ActorPath.TryParse("s://dd@:12", out ignored).ShouldBe(false);
ActorPath.TryParse("s://dd@h:hd", out ignored).ShouldBe(false);
ActorPath.TryParse("a://l:1/b", out ignored).ShouldBe(false);
ActorPath.TryParse("", out _).ShouldBe(false);
ActorPath.TryParse("://hallo", out _).ShouldBe(false);
ActorPath.TryParse("s://dd@:12", out _).ShouldBe(false);
ActorPath.TryParse("s://dd@h:hd", out _).ShouldBe(false);
ActorPath.TryParse("a://l:1/b", out _).ShouldBe(false);
ActorPath.TryParse("akka:/", out _).ShouldBe(false);
}

[Fact]
Expand Down
Loading