Skip to content

Commit

Permalink
Akka.Cluster.Sharding GetEntityLocation Query (#6101)
Browse files Browse the repository at this point in the history
* adding new `GetEntityLocation` query to `ShardRegion`

designed to make it easier for testing and telemetry, the new `GetEntityLocation` is an `IShardRegionQuery` that checks for the presence of an entity in a given `ShardRegion` and reports back if this entity is able to be located.

This query is not designed to be supported over the network and is meant to be local-only.

* added spec and bugfixes

* added API approval

* added documentation and more detailed error messages
  • Loading branch information
Aaronontheweb authored Sep 17, 2022
1 parent e2b1d73 commit 231c8c9
Show file tree
Hide file tree
Showing 5 changed files with 557 additions and 118 deletions.
20 changes: 13 additions & 7 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ Possible reasons for disabling remember entity storage are:

For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead.

> [!NOTE]
> Currently, Lightning.NET library, the storage solution used to store DData in disk, is having problem
> deploying native library files in [Linux operating system operating in x64 and ARM platforms]
> (<https://github.com/CoreyKaylor/Lightning.NET/issues/141>).
>
> You will need to install LightningDB in your Linux distribution manually if you wanted to use the durable DData feature.
### Terminating Remembered Entities

One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop.
Expand Down Expand Up @@ -217,6 +210,19 @@ You can inspect current sharding stats by using following messages:
* On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them.
* On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them.

### Querying for the Location of Specific Entities

It's possible to query a `ShardRegion` or a `ShardRegionProxy` using a `GetEntityLocation` query:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationQuery)]

A `GetEntityLocation` query will always return an `EntityLocation` response - even if the query could not be executed.

> [!IMPORTANT]
> One major caveat is that in order for the `GetEntityLocation` to execute your `IMessageExtractor` or `ShardExtractor` delegate will need to support the `ShardRegion.StartEntity` message - just like you'd have to use in order to support `remember-entities=on`:
[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationExtractor)]

## Integrating Cluster Sharding with Persistent Actors

One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//-----------------------------------------------------------------------
// <copyright file="ShardRegionQueriesSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardRegionQueriesSpecs : AkkaSpec
{
private Cluster _cluster;
private ClusterSharding _clusterSharding;
private IActorRef _shardRegion;

private ActorSystem _proxySys;

public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper)
{
_clusterSharding = ClusterSharding.Get(Sys);
_cluster = Cluster.Get(Sys);
_shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true),
ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId);

var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]")
.WithFallback(Sys.Settings.Config);
_proxySys = ActorSystem.Create(Sys.Name, proxySysConfig);

_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });

// form a 2-node cluster
var proxyCluster = Cluster.Get(_proxySys);
proxyCluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });
}

protected override void AfterAll()
{
Shutdown(_proxySys);
base.AfterAll();
}

private Option<(string, object)> ExtractEntityId(object message)
{
switch (message)
{
case int i:
return (i.ToString(), message);
}

throw new NotSupportedException();
}

// <GetEntityLocationExtractor>
private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10).ToString();
// must support ShardRegion.StartEntity in order for
// GetEntityLocation to work properly
case ShardRegion.StartEntity se:
return se.EntityId;
}

throw new NotSupportedException();
}
// </GetEntityLocationExtractor>

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.roles = [shard]")
.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

/// <summary>
/// DocFx material for demonstrating how this query type works
/// </summary>
[Fact]
public async Task ShardRegion_GetEntityLocation_DocumentationSpec()
{
// <GetEntityLocationQuery>
// creates an entity with entityId="1"
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));

// determine where entity with "entityId=1" is located in cluster
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));

q1.EntityId.Should().Be("1");

// have a valid ShardId
q1.ShardId.Should().NotBeEmpty();

// have valid address for node that will / would host entity
q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address

// if entity actor is alive, will retrieve a reference to it
q1.EntityRef.HasValue.Should().BeTrue();
// </GetEntityLocationQuery>
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")]
public async Task ShardRegion_should_support_GetEntityLocation_query_locally()
{
// arrange
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
await _shardRegion.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")]
public async Task ShardRegion_should_support_GetEntityLocation_query_remotely()
{
// arrange
var sharding2 = ClusterSharding.Get(_proxySys);
var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", ExtractEntityId, ExtractShardId);

await shardRegionProxy.Ask<int>(1, TimeSpan.FromSeconds(3));
await shardRegionProxy.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}
}
}
Loading

0 comments on commit 231c8c9

Please sign in to comment.