Skip to content

Commit

Permalink
fix double return of recovery permit (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
zbynek001 authored and Aaronontheweb committed Nov 27, 2017
1 parent 8010d5f commit 34ce5a3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 21 deletions.
58 changes: 54 additions & 4 deletions src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,37 @@
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using System;
using Xunit;

namespace Akka.Persistence.Tests
{
public class RecoveryPermitterSpec : PersistenceSpec
{

public class TestExc : Exception
{
public TestExc()
: base("simulated exc")
{
}
}

public class TestPersistentActor : UntypedPersistentActor
{
public override string PersistenceId { get; }
public IActorRef Probe { get; }

public static Props Props(string name, IActorRef probe) =>
Actor.Props.Create(() => new TestPersistentActor(name, probe));
public bool ThrowFromRecoveryCompleted { get; }

public static Props Props(string name, IActorRef probe, Boolean throwFromRecoveryCompleted = false) =>
Actor.Props.Create(() => new TestPersistentActor(name, probe, throwFromRecoveryCompleted));

public TestPersistentActor(string name, IActorRef probe)
public TestPersistentActor(string name, IActorRef probe, bool throwFromRecoveryCompleted)
{
PersistenceId = name;
Probe = probe;
ThrowFromRecoveryCompleted = throwFromRecoveryCompleted;
}

protected override void PostStop()
Expand All @@ -37,7 +50,11 @@ protected override void PostStop()
protected override void OnRecover(object message)
{
if (message is RecoveryCompleted)
{
Probe.Tell(message);
if (ThrowFromRecoveryCompleted)
throw new TestExc();
}
}

protected override void OnCommand(object message)
Expand Down Expand Up @@ -70,7 +87,7 @@ private void RequestPermit(TestProbe probe)
[Fact]
public void RecoveryPermitter_must_grant_permits_up_to_the_limit()
{
var p1 = CreateTestProbe();
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();
var p3 = CreateTestProbe();
var p4 = CreateTestProbe();
Expand Down Expand Up @@ -196,5 +213,38 @@ public void RecoveryPermitter_must_return_permit_when_actor_is_prematurely_termi
permitter.Tell(ReturnRecoveryPermit.Instance, p3.Ref);
permitter.Tell(ReturnRecoveryPermit.Instance, p4.Ref);
}

[Fact]
public void RecoveryPermitter_must_return_permit_when_actor_throws_from_RecoveryCompleted()
{
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();
var p3 = CreateTestProbe();
var p4 = CreateTestProbe();

RequestPermit(p1);
RequestPermit(p2);

var persistentActor = Sys.ActorOf(TestPersistentActor.Props("p3", p3.Ref, throwFromRecoveryCompleted: true));
p3.ExpectMsg<RecoveryCompleted>();
p3.ExpectMsg("postStop");
// it's restarting
for (int i = 1; i < 5; i++)
{
p3.ExpectMsg<RecoveryCompleted>();
p3.ExpectMsg("postStop");
}
// stop it
var stopProbe = CreateTestProbe();
stopProbe.Watch(persistentActor);
Sys.Stop(persistentActor);
stopProbe.ExpectTerminated(persistentActor);

RequestPermit(p4);

permitter.Tell(ReturnRecoveryPermit.Instance, p1.Ref);
permitter.Tell(ReturnRecoveryPermit.Instance, p2.Ref);
permitter.Tell(ReturnRecoveryPermit.Instance, p4.Ref);
}
}
}
28 changes: 11 additions & 17 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public EventsourcedState(string name, bool isRecoveryRunning, StateReceive state
public abstract partial class Eventsourced
{
/// <summary>
/// Initial state. Before starting the actual recovery it must get a permit from the `RecoveryPermitter`.
/// When starting many persistent actors at the same time the journal and its data store is protected from
/// being overloaded by limiting number of recoveries that can be in progress at the same time.
/// Initial state. Before starting the actual recovery it must get a permit from the `RecoveryPermitter`.
/// When starting many persistent actors at the same time the journal and its data store is protected from
/// being overloaded by limiting number of recoveries that can be in progress at the same time.
/// When receiving `RecoveryPermitGranted` it switches to `recoveryStarted` state.
/// All incoming messages are stashed.
/// </summary>
Expand All @@ -57,9 +57,9 @@ private EventsourcedState WaitingRecoveryPermit(Recovery recovery)
}

/// <summary>
/// Processes a loaded snapshot, if any. A loaded snapshot is offered with a <see cref="SnapshotOffer"/>
/// message to the actor's <see cref="ReceiveRecover"/>. Then initiates a message replay, either starting
/// from the loaded snapshot or from scratch, and switches to <see cref="ReplayStarted"/> state.
/// Processes a loaded snapshot, if any. A loaded snapshot is offered with a <see cref="SnapshotOffer"/>
/// message to the actor's <see cref="ReceiveRecover"/>. Then initiates a message replay, either starting
/// from the loaded snapshot or from scratch, and switches to <see cref="ReplayStarted"/> state.
/// All incoming messages are stashed.
/// </summary>
/// <param name="maxReplays">Maximum number of messages to replay</param>
Expand Down Expand Up @@ -144,12 +144,12 @@ private void ReturnRecoveryPermit()

/// <summary>
/// Processes replayed messages, if any. The actor's <see cref="ReceiveRecover"/> is invoked with the replayed events.
///
///
/// If replay succeeds it got highest stored sequence number response from the journal and then switches
/// to <see cref="ProcessingCommands"/> state.
/// If replay succeeds the <see cref="OnReplaySuccess"/> callback method is called, otherwise
/// <see cref="OnRecoveryFailure"/>.
///
///
/// All incoming messages are stashed.
/// </summary>
private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
Expand Down Expand Up @@ -195,14 +195,8 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
LastSequenceNr = m.HighestSequenceNr;
_internalStash.UnstashAll();

try
{
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
}
finally
{
ReturnRecoveryPermit();
}
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
ReturnRecoveryPermit();
}
else if (message is ReplayMessagesFailure)
{
Expand Down Expand Up @@ -250,7 +244,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
}

/// <summary>
/// If event persistence is pending after processing a command, event persistence
/// If event persistence is pending after processing a command, event persistence
/// is triggered and the state changes to <see cref="PersistingEvents"/>.
/// </summary>
private EventsourcedState ProcessingCommands()
Expand Down

0 comments on commit 34ce5a3

Please sign in to comment.