Skip to content

Commit

Permalink
Merge pull request #21 from Aaronontheweb/cluster-shard-region-proxy
Browse files Browse the repository at this point in the history
Cluster `ShardRegionProxy` and `DistributedPubSub` support
  • Loading branch information
Aaronontheweb authored Apr 1, 2022
2 parents e18db7c + fe60570 commit 7ae8ece
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 4 deletions.
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
## [0.1.2] / 31 March 2022
- Removed all `Cluster.Sharding` methods that rely on `ClusterShardingSettings`, since it's not practical to create those prior to starting the `ActorSystem`.
## [0.1.3] / 31 March 2022
- Added `ShardRegionProxy` support to Akka.Cluster.Hosting
- Added `DistributedPubSub` support to Akka.Cluster.Hosting
80 changes: 80 additions & 0 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,85 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurat
registry.TryRegister<TKey>(shardRegion);
});
}

/// <summary>
/// Starts a ShardRegionProxy that points to a <see cref="ShardRegion"/> hosted on a different role inside the cluster
/// and registers the <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="typeName">The name of the entity type</param>
/// <param name="roleName">The role of the Akka.Cluster member that is hosting this <see cref="ShardRegion"/>.</param>
/// <param name="extractEntityId">
/// Partial function to extract the entity id and the message to send to the entity from the incoming message,
/// if the partial function does not match the message will be `unhandled`,
/// i.e.posted as `Unhandled` messages on the event stream
/// </param>
/// <param name="extractShardId">
/// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used
/// </param>
/// <typeparam name="TKey">The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfigurationBuilder builder,
string typeName, string roleName, ExtractEntityId extractEntityId, ExtractShardId extractShardId)
{
return builder.WithActors(async (system, registry) =>
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, extractEntityId, extractShardId);
registry.TryRegister<TKey>(shardRegionProxy);
});
}

/// <summary>
/// Starts a ShardRegionProxy that points to a <see cref="ShardRegion"/> hosted on a different role inside the cluster
/// and registers the <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="typeName">The name of the entity type</param>
/// <param name="roleName">The role of the Akka.Cluster member that is hosting this <see cref="ShardRegion"/>.</param>
/// <param name="messageExtractor">
/// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message.
/// </param>
/// <typeparam name="TKey">The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfigurationBuilder builder,
string typeName, string roleName, IMessageExtractor messageExtractor)
{
return builder.WithActors(async (system, registry) =>
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, messageExtractor);
registry.TryRegister<TKey>(shardRegionProxy);
});
}

/// <summary>
/// Starts <see cref="DistributedPubSub"/> on this node immediately upon <see cref="ActorSystem"/> startup.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="role">Specifies which role <see cref="DistributedPubSub"/> will broadcast gossip to. If this value
/// is left blank then ALL roles will be targeted.</param>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
/// <remarks>
/// Stores the mediator <see cref="IActorRef"/> in the registry using the <see cref="DistributedPubSub"/> key.
/// </remarks>
public static AkkaConfigurationBuilder WithDistributedPubSub(this AkkaConfigurationBuilder builder,
string role)
{
var middle = builder.AddHocon(DistributedPubSub.DefaultConfig());
if (!string.IsNullOrEmpty(role)) // add role config
{
middle = middle.AddHocon($"akka.cluster.pub-sub = \"{role}\"");
}

return middle.WithActors((system, registry) =>
{
// force the initialization
var mediator = DistributedPubSub.Get(system).Mediator;
registry.TryRegister<DistributedPubSub>(mediator);
});
}
}
}
5 changes: 3 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2022 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>0.1.2</VersionPrefix>
<PackageReleaseNotes>• Removed all Cluster.Sharding methods that rely on ClusterShardingSettings%2C since it's not practical to create those prior to starting the ActorSystem.</PackageReleaseNotes>
<VersionPrefix>0.1.3</VersionPrefix>
<PackageReleaseNotes>• Added ShardRegionProxy support to Akka.Cluster.Hosting
• Added DistributedPubSub support to Akka.Cluster.Hosting</PackageReleaseNotes>
<PackageIconUrl>
</PackageIconUrl>
<PackageProjectUrl>
Expand Down

0 comments on commit 7ae8ece

Please sign in to comment.