Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cluster.Sharding ActorInitializationException spec #5999

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util.Extensions;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -36,6 +39,52 @@ protected override void OnReceive(object message)
{
}
}


internal class ConstructorFailActor : ActorBase
{
private static bool _thrown;
private readonly ILoggingAdapter _log = Context.GetLogger();

public ConstructorFailActor()
{
if (!_thrown)
{
_thrown = true;
throw new Exception("EXPLODING CONSTRUCTOR!");
}
}

protected override bool Receive(object message)
{
_log.Info("Msg {0}", message);
Sender.Tell($"ack {message}");
return true;
}
}

internal class PreStartFailActor : ActorBase
{
private static bool _thrown;
private readonly ILoggingAdapter _log = Context.GetLogger();

protected override void PreStart()
{
base.PreStart();
if (!_thrown)
{
_thrown = true;
throw new Exception("EXPLODING PRE-START!");
}
}

protected override bool Receive(object message)
{
_log.Info("Msg {0}", message);
Sender.Tell($"ack {message}");
return true;
}
}

static PersistentShardSpec()
{
Expand Down Expand Up @@ -87,5 +136,98 @@ public void Persistent_Shard_must_remember_entities_started_with_StartEntity()
ExpectMsgAllOf(new Shard.ShardStats("shard-1", 1));
});
}

[Theory(DisplayName = "Persistent shard must recover from transient failures inside sharding entity constructor and PreStart method")]
[MemberData(nameof(PropsFactory))]
public async Task Persistent_Shard_must_recover_from_failing_entity(Props entityProp)
{
ExtractEntityId extractEntityId = message =>
{
switch (message)
{
case ShardSpec.EntityEnvelope env:
return (env.Id.ToString(), env.Payload);
}
return Option<(string, object)>.None;
};

ExtractShardId extractShardId = message =>
{
switch (message)
{
case ShardSpec.EntityEnvelope msg:
return msg.Id.ToString();
}
return null;
};

var settings = ClusterShardingSettings.Create(Sys);
var tuning = settings.TuningParameters;
settings = settings.WithTuningParameters(new TuningParameters
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very bad coding experience. We need to at least add a Copy() method to TuningParameter class so no one should be forced to write this. Or use the record feature in .NET 5.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Can't go record until we drop .NET Standard 2.0 support, which isn't slated until 2.0.

(
coordinatorFailureBackoff: tuning.CoordinatorFailureBackoff,
retryInterval: tuning.RetryInterval,
bufferSize: tuning.BufferSize,
handOffTimeout: tuning.HandOffTimeout,
shardStartTimeout: tuning.ShardStartTimeout,
shardFailureBackoff: tuning.ShardFailureBackoff,
entityRestartBackoff: 1.Seconds(),
rebalanceInterval: tuning.RebalanceInterval,
snapshotAfter: tuning.SnapshotAfter,
keepNrOfBatches: tuning.KeepNrOfBatches,
leastShardAllocationRebalanceThreshold: tuning.LeastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance: tuning.LeastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout: tuning.WaitingForStateTimeout,
updatingStateTimeout: tuning.UpdatingStateTimeout,
entityRecoveryStrategy: tuning.EntityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency: tuning.EntityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities: tuning.EntityRecoveryConstantRateStrategyNumberOfEntities,
leastShardAllocationAbsoluteLimit: tuning.LeastShardAllocationAbsoluteLimit,
leastShardAllocationRelativeLimit: tuning.LeastShardAllocationRelativeLimit
));

var props = Props.Create(() => new PersistentShard(
"cats",
"shard-1",
_ => entityProp,
settings,
extractEntityId,
extractShardId,
PoisonPill.Instance
));

Sys.EventStream.Subscribe<Error>(TestActor);

var persistentShard = Sys.ActorOf(props);

persistentShard.Tell(new ShardRegion.StartEntity("1"));
ExpectMsg(new ShardRegion.StartEntityAck("1", "shard-1"));

// entity died here
var err = ExpectMsg<Error>();
err.Cause.Should().BeOfType<ActorInitializationException>();

await AwaitConditionAsync(() =>
{
persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var failedState = ExpectMsg<Shard.CurrentShardState>();
return failedState.EntityIds.Count == 0;
});

// entity should be restarted when it received this message
persistentShard.Tell(new ShardSpec.EntityEnvelope(1, "Restarted"));
ExpectMsg("ack Restarted");

persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var state = ExpectMsg<Shard.CurrentShardState>();
state.EntityIds.Count.Should().Be(1);
state.EntityIds.First().Should().Be("1");
}

public static IEnumerable<object[]> PropsFactory()
{
yield return new object[] { Props.Create(() => new PreStartFailActor()) };
yield return new object[] { Props.Create(() => new ConstructorFailActor()) };
}
}
}