diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs index 55406a48924..8dfc5bc2195 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs @@ -6,14 +6,17 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Akka.Actor; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; +using Akka.Event; using Akka.TestKit; -using Akka.Util.Extensions; +using Akka.Util; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -36,6 +39,52 @@ protected override void OnReceive(object message) { } } + + + internal class ConstructorFailActor : ActorBase + { + private static bool _thrown; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public ConstructorFailActor() + { + if (!_thrown) + { + _thrown = true; + throw new Exception("EXPLODING CONSTRUCTOR!"); + } + } + + protected override bool Receive(object message) + { + _log.Info("Msg {0}", message); + Sender.Tell($"ack {message}"); + return true; + } + } + + internal class PreStartFailActor : ActorBase + { + private static bool _thrown; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + protected override void PreStart() + { + base.PreStart(); + if (!_thrown) + { + _thrown = true; + throw new Exception("EXPLODING PRE-START!"); + } + } + + protected override bool Receive(object message) + { + _log.Info("Msg {0}", message); + Sender.Tell($"ack {message}"); + return true; + } + } static PersistentShardSpec() { @@ -87,5 +136,98 @@ public void Persistent_Shard_must_remember_entities_started_with_StartEntity() ExpectMsgAllOf(new Shard.ShardStats("shard-1", 1)); }); } + + [Theory(DisplayName = "Persistent shard must recover from transient failures inside sharding entity constructor and PreStart method")] + [MemberData(nameof(PropsFactory))] + public async Task Persistent_Shard_must_recover_from_failing_entity(Props entityProp) + { + ExtractEntityId extractEntityId = message => + { + switch (message) + { + case ShardSpec.EntityEnvelope env: + return (env.Id.ToString(), env.Payload); + } + return Option<(string, object)>.None; + }; + + ExtractShardId extractShardId = message => + { + switch (message) + { + case ShardSpec.EntityEnvelope msg: + return msg.Id.ToString(); + } + return null; + }; + + var settings = ClusterShardingSettings.Create(Sys); + var tuning = settings.TuningParameters; + settings = settings.WithTuningParameters(new TuningParameters + ( + coordinatorFailureBackoff: tuning.CoordinatorFailureBackoff, + retryInterval: tuning.RetryInterval, + bufferSize: tuning.BufferSize, + handOffTimeout: tuning.HandOffTimeout, + shardStartTimeout: tuning.ShardStartTimeout, + shardFailureBackoff: tuning.ShardFailureBackoff, + entityRestartBackoff: 1.Seconds(), + rebalanceInterval: tuning.RebalanceInterval, + snapshotAfter: tuning.SnapshotAfter, + keepNrOfBatches: tuning.KeepNrOfBatches, + leastShardAllocationRebalanceThreshold: tuning.LeastShardAllocationRebalanceThreshold, + leastShardAllocationMaxSimultaneousRebalance: tuning.LeastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout: tuning.WaitingForStateTimeout, + updatingStateTimeout: tuning.UpdatingStateTimeout, + entityRecoveryStrategy: tuning.EntityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency: tuning.EntityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities: tuning.EntityRecoveryConstantRateStrategyNumberOfEntities, + leastShardAllocationAbsoluteLimit: tuning.LeastShardAllocationAbsoluteLimit, + leastShardAllocationRelativeLimit: tuning.LeastShardAllocationRelativeLimit + )); + + var props = Props.Create(() => new PersistentShard( + "cats", + "shard-1", + _ => entityProp, + settings, + extractEntityId, + extractShardId, + PoisonPill.Instance + )); + + Sys.EventStream.Subscribe(TestActor); + + var persistentShard = Sys.ActorOf(props); + + persistentShard.Tell(new ShardRegion.StartEntity("1")); + ExpectMsg(new ShardRegion.StartEntityAck("1", "shard-1")); + + // entity died here + var err = ExpectMsg(); + err.Cause.Should().BeOfType(); + + await AwaitConditionAsync(() => + { + persistentShard.Tell(Shard.GetCurrentShardState.Instance); + var failedState = ExpectMsg(); + return failedState.EntityIds.Count == 0; + }); + + // entity should be restarted when it received this message + persistentShard.Tell(new ShardSpec.EntityEnvelope(1, "Restarted")); + ExpectMsg("ack Restarted"); + + persistentShard.Tell(Shard.GetCurrentShardState.Instance); + var state = ExpectMsg(); + state.EntityIds.Count.Should().Be(1); + state.EntityIds.First().Should().Be("1"); + } + + public static IEnumerable PropsFactory() + { + yield return new object[] { Props.Create(() => new PreStartFailActor()) }; + yield return new object[] { Props.Create(() => new ConstructorFailActor()) }; + } } }