Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify SuicideActor to make it perform 0 writes #289

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

<ItemGroup>
<PackageReference Include="akka.persistence.sqlite" />
<PackageReference Include="Akka.Persistence.TestKit.Xunit2" />
<PackageReference Include="Akka.TestKit.Xunit2" />
<PackageReference Include="FluentAssertions" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
Expand All @@ -18,6 +17,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Akka.HealthCheck.Persistence.TestKit\Akka.HealthCheck.Persistence.TestKit.csproj" />
<ProjectReference Include="..\Akka.HealthCheck.Persistence\Akka.HealthCheck.Persistence.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace Akka.HealthCheck.Persistence.Tests
{
public class AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs : TestKit.Xunit2.TestKit
public class AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs : Akka.TestKit.Xunit2.TestKit
{
public AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs(ITestOutputHelper helper)
: base(TestConfig.BadSnapshotConfig, output: helper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Akka.HealthCheck.Persistence.Tests
{
public class AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs : TestKit.Xunit2.TestKit
public class AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs : Akka.TestKit.Xunit2.TestKit
{
public AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs(ITestOutputHelper helper)
: base(TestConfig.BadJournalConfig, output: helper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Akka.HealthCheck.Persistence.Tests
{
public class AkkaPersistenceLivenessProbeSubscriptionTest : TestKit.Xunit2.TestKit
public class AkkaPersistenceLivenessProbeSubscriptionTest : Akka.TestKit.Xunit2.TestKit
{

public AkkaPersistenceLivenessProbeSubscriptionTest(ITestOutputHelper helper)
Expand Down
58 changes: 0 additions & 58 deletions src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit.Journal;
using Akka.HealthCheck.Persistence.TestKit.SnapshotStore;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
Expand All @@ -27,22 +29,18 @@ public LivenessProbeTimeoutSpec(ITestOutputHelper output) : base(nameof(Liveness
public async Task SnapshotLoadTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token);
var delay = new ConnectionInterceptors.CancelableDelay(30.Minutes(), ConnectionInterceptors.Noop.Instance, cts.Token);

await WithSnapshotLoad(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
await WithSnapshotConnection(connect => connect.SetInterceptorAsync(delay), () => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal recovery does not respond")]
public async Task JournalRecoveryTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token);
var delay = new ConnectionInterceptors.CancelableDelay(30.Minutes(), ConnectionInterceptors.Noop.Instance, cts.Token);

await WithJournalRecovery(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
await WithJournalConnection(connect => connect.SetInterceptorAsync(delay), () => TestTimeout(cts));
}

private async Task TestTimeout(CancellationTokenSource cts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit.Journal;
using FluentAssertions;
using Xunit;

Expand Down
25 changes: 6 additions & 19 deletions src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit.Journal;
using Akka.HealthCheck.Persistence.TestKit.SnapshotStore;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
Expand All @@ -25,22 +27,20 @@ public ProbeFailureSpec(ITestOutputHelper output) : base("akka.loglevel = DEBUG"
[Fact(DisplayName = "Status should reflect successful probe")]
public void SuccessfulProbeTest()
{
PerformWarmup();
var status = PerformProbe();
status.IsLive.Should().BeTrue();
status.Failure.Should().BeNull();
}

[Fact(DisplayName = "Journal recovery failed, probe should fail")]
[Fact(DisplayName = "Journal connection failed, probe should fail")]
public async Task JournalRecoverFailTest()
{
await WithJournalRecovery(recover => recover.Fail(), () =>
await WithJournalConnection(connect => connect.Fail(), async () =>
{
PerformWarmup();
var status = PerformProbe();
status.IsLive.Should().BeFalse();
var e = status.Failure;
e.Should().NotBeNull().And.BeOfType<TestJournalFailureException>();
e.Should().NotBeNull().And.BeOfType<TestConnectionException>();
});
}

Expand All @@ -49,26 +49,13 @@ public async Task SnapshotRecoverFailTest()
{
await WithSnapshotLoad(load => load.Fail(), () =>
{
PerformWarmup(true);
var status = PerformProbe();
status.IsLive.Should().BeFalse();
var e = status.Failure!;
e.Should().NotBeNull().And.BeOfType<TestSnapshotStoreFailureException>();
});
}

private void PerformWarmup(bool expectFailed = false)
{
var warmupProbe = ActorOf(SuicideWarmupProbe.Props(TestActor, AkkaPersistenceLivenessProbe.PersistenceId, true));
Watch(warmupProbe);
if(expectFailed)
ExpectMsg<WarmupFailed>();
else
ExpectMsg<WarmupComplete>();
ExpectTerminated(warmupProbe);
Unwatch(warmupProbe);
}

private PersistenceLivenessStatus PerformProbe()
{
var liveProbe = ActorOf(SuicideProbe.Props(TestActor, AkkaPersistenceLivenessProbe.PersistenceId, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Persistence.TestKit;
using Akka.HealthCheck.Persistence.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
Expand All @@ -27,19 +27,19 @@ public RegressionProbeFailureSpec(ITestOutputHelper output) : base("akka.logleve
[Fact(DisplayName = "Probe should be performed in proper interval with snapshot recovery failure")]
public async Task IntervalTest()
{
await WithSnapshotLoad(load => load.Fail(), async () =>
await WithSnapshotConnection(load => load.Fail(), async () =>
{
Sys.EventStream.Subscribe(TestActor, typeof(LogEvent));
var probe = Sys.ActorOf(Props.Create(() =>
new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds(), 0)));
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence warmup probe.");
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence probe.");

var stopwatch = Stopwatch.StartNew();
// Default circuit breaker max-failures is 10
foreach (var _ in Enumerable.Range(0, 15))
{
stopwatch.Restart();
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence warmup probe.");
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence probe.");
stopwatch.Stop();
// In the original issue, suicide probe is being recreated immediately after failure without waiting
stopwatch.Elapsed.Should().BeGreaterThan(300.Milliseconds());
Expand Down
107 changes: 0 additions & 107 deletions src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs

This file was deleted.

Loading