Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added telemetry injection point for Ask<T> #5297

Merged
merged 1 commit into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration;

namespace Akka.Cluster.Metrics
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,8 @@ namespace Akka.Actor
Akka.Actor.IInternalActorRef TempContainer { get; }
System.Threading.Tasks.Task TerminationTask { get; }
Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async);
[Akka.Annotations.InternalApiAttribute()]
Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs);
Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address);
void Init(Akka.Actor.Internal.ActorSystemImpl system);
void RegisterTempActor(Akka.Actor.IInternalActorRef actorRef, Akka.Actor.ActorPath path);
Expand Down Expand Up @@ -1279,6 +1281,7 @@ namespace Akka.Actor
public Akka.Actor.IInternalActorRef TempContainer { get; }
public System.Threading.Tasks.Task TerminationTask { get; }
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
public Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address) { }
public void Init(Akka.Actor.Internal.ActorSystemImpl system) { }
public void RegisterExtraName(string name, Akka.Actor.IInternalActorRef actor) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ namespace Akka.Remote
public System.Threading.Tasks.Task TerminationTask { get; }
public Akka.Remote.RemoteTransport Transport { get; }
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
protected virtual Akka.Actor.IActorRef CreateRemoteDeploymentWatcher(Akka.Actor.Internal.ActorSystemImpl system) { }
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.ActorPath actorPath, Akka.Actor.Address localAddress) { }
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.Address localAddress, Akka.Actor.ActorPath rpath, Akka.Actor.Deploy deployment) { }
Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1561,8 +1561,9 @@ public void StopSeedNodeProcess()
/// Received `Join` message and replies with `Welcome` message, containing
/// current gossip state, including the new joining member.
/// </summary>
/// <param name="node">TBD</param>
/// <param name="roles">TBD</param>
/// <param name="node">The unique address of the joining node.</param>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated XML-DOC fix I made to resolve a compiler warning.

/// <param name="roles">The roles, if any, of the joining node.</param>
/// <param name="appVersion">The software version of the joining node.</param>
public void Joining(UniqueAddress node, ImmutableHashSet<string> roles, AppVersion appVersion)
{
var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status;
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ public void UnregisterTempActor(ActorPath path)
_local.UnregisterTempActor(path);
}

/// <inheritdoc/>
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
{
return _local.CreateFutureRef(tcs);
}

private IActorRef _remotingTerminator;
private IActorRef _remoteWatcher;

Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Actor/ActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ public interface IActorRefProvider
/// <param name="path">A path returned by <see cref="TempPath"/>. Do NOT pass in any other path!</param>
void UnregisterTempActor(ActorPath path);

/// <summary>
/// Automatically generates a <see cref="FutureActorRef{T}"/> with a temporary path.
/// </summary>
/// <remarks>
/// Does not call <see cref="RegisterTempActor"/> or <see cref="UnregisterTempActor"/>.
/// </remarks>
/// <param name="tcs">A typed <see cref="TaskCompletionSource{T}"/></param>
/// <typeparam name="T">The type of output this <see cref="FutureActorRef{T}"/> expects.</typeparam>
/// <returns>A new, single-use <see cref="FutureActorRef{T}"/> instance.</returns>
[InternalApi]
FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New InternalApi, but still public member on the IActorRefProvider interface. This is our injection point.


/// <summary>
/// Actor factory with create-only semantics: will create an actor as
/// described by <paramref name="props"/> with the given <paramref name="supervisor"/> and <paramref name="path"/> (may be different
Expand Down Expand Up @@ -386,6 +398,16 @@ public void UnregisterTempActor(ActorPath path)
_tempContainer.RemoveChild(path.Name);
}

/// <inheritdoc cref="IActorRefProvider.CreateFutureRef{T}"/>
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
{
//create a new tempcontainer path
var path = TempPath();

var future = new FutureActorRef<T>(tcs, path, this);
return future;
}

/// <summary>
/// Initializes the ActorRefProvider
/// </summary>
Expand Down
6 changes: 2 additions & 4 deletions src/core/Akka/Actor/Futures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,8 @@ public static Task<T> Ask<T>(this ICanTell self, Func<IActorRef, object> message
ctr2 = cancellationToken.Register(() => result.TrySetCanceled());
}

//create a new tempcontainer path
var path = provider.TempPath();

var future = new FutureActorRef<T>(result, path, provider);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these two lines to the factory method - all of the TCS setup is left as-is.

var future = provider.CreateFutureRef(result);
var path = future.Path;

//The future actor needs to be unregistered in the temp container
_ = result.Task.ContinueWith(t =>
Expand Down