Skip to content

Commit

Permalink
null as empty state, in javadsl, akka#25768
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Oct 18, 2018
1 parent 1691961 commit bed17cc
Show file tree
Hide file tree
Showing 9 changed files with 568 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
}

protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = {
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), setup.selfUntyped)
// don't store null state
if (state.state != null)
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(
SnapshotMetadata(setup.persistenceId.id, state.seqNr),
state.state), setup.selfUntyped)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl

import java.util.function.BiFunction
import java.util.function.Predicate
import java.util.function.{ Function JFunction }

import akka.annotation.InternalApi
import akka.persistence.typed.internal._
Expand All @@ -23,26 +24,22 @@ trait CommandHandler[Command, Event, State] {

object CommandHandlerBuilder {

private val _trueStatePredicate: Predicate[Any] = new Predicate[Any] {
override def test(t: Any): Boolean = true
}

private def trueStatePredicate[S]: Predicate[S] = _trueStatePredicate.asInstanceOf[Predicate[S]]

/**
* @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass`
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, S <: State, State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] =
new CommandHandlerBuilder(stateClass, statePredicate = trueStatePredicate)
new CommandHandlerBuilder(statePredicate = new Predicate[S] {
override def test(state: S): Boolean = state != null && stateClass.isAssignableFrom(state.getClass)
})

/**
* @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`,
* useful for example when state type is an Optional
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, State](statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] =
new CommandHandlerBuilder(classOf[Any].asInstanceOf[Class[State]], statePredicate)
new CommandHandlerBuilder(statePredicate)

/**
* INTERNAL API
Expand All @@ -54,15 +51,15 @@ object CommandHandlerBuilder {
}

final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalApi private[persistence] (
val stateClass: Class[S], val statePredicate: Predicate[S]) {
val statePredicate: Predicate[S]) {
import CommandHandlerBuilder.CommandHandlerCase

private var cases: List[CommandHandlerCase[Command, Event, State]] = Nil

private def addCase(predicate: Command Boolean, handler: BiFunction[S, Command, Effect[Event, State]]): Unit = {
cases = CommandHandlerCase[Command, Event, State](
commandPredicate = predicate,
statePredicate = state stateClass.isAssignableFrom(state.getClass) && statePredicate.test(state.asInstanceOf[S]),
statePredicate = state statePredicate.test(state.asInstanceOf[S]),
handler.asInstanceOf[BiFunction[State, Command, Effect[Event, State]]]) :: cases
}

Expand All @@ -74,17 +71,45 @@ final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalAp
this
}

/**
* Match any command which the given `predicate` returns true for.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchCommand(predicate: Predicate[Command], handler: JFunction[Command, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd predicate.test(cmd), new BiFunction[S, Command, Effect[Event, State]] {
override def apply(state: S, cmd: Command): Effect[Event, State] = handler(cmd)
})
this
}

/**
* Match commands that are of the given `commandClass` or subclass thereof
*/
def matchCommand[C <: Command](commandClass: Class[C], handler: BiFunction[S, C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd commandClass.isAssignableFrom(cmd.getClass), handler.asInstanceOf[BiFunction[S, Command, Effect[Event, State]]])
this
}

/**
* Match commands that are of the given `commandClass` or subclass thereof.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchCommand[C <: Command](commandClass: Class[C], handler: JFunction[C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
matchCommand[C](commandClass, new BiFunction[S, C, Effect[Event, State]] {
override def apply(state: S, cmd: C): Effect[Event, State] = handler(cmd)
})
}

/**
* Compose this builder with another builder. The handlers in this builder will be tried first followed
* by the handlers in `other`.
*/
def orElse[S2 <: State](other: CommandHandlerBuilder[Command, Event, S2, State]): CommandHandlerBuilder[Command, Event, S2, State] = {
val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.stateClass, other.statePredicate)
val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.statePredicate)
// problem with overloaded constructor with `cases` as parameter
newBuilder.cases = other.cases ::: cases
newBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.typed.javadsl

import java.util.function.BiFunction
import java.util.function.{ Function JFunction }

import akka.annotation.InternalApi
import akka.util.OptionVal
Expand Down Expand Up @@ -49,11 +50,23 @@ final class EventHandlerBuilder[State >: Null, Event]() {
this
}

/**
* Match any event which is an instance of `E` or a subtype of `E`.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchEvent[E <: Event](eventClass: Class[E], f: JFunction[E, State]): EventHandlerBuilder[State, Event] = {
matchEvent[E](eventClass, new BiFunction[State, E, State] {
override def apply(state: State, event: E): State = f(event)
})
}

def matchEvent[E <: Event, S <: State](eventClass: Class[E], stateClass: Class[S],
biFunction: BiFunction[S, E, State]): EventHandlerBuilder[State, Event] = {

cases = EventHandlerCase[State, Event](
statePredicate = s stateClass.isAssignableFrom(s.getClass),
statePredicate = s s != null && stateClass.isAssignableFrom(s.getClass),
eventPredicate = e eventClass.isAssignableFrom(e.getClass),
biFunction.asInstanceOf[BiFunction[State, Event, State]]) :: cases
this
Expand Down Expand Up @@ -102,7 +115,9 @@ final class EventHandlerBuilder[State >: Null, Event]() {
}

result match {
case OptionVal.None throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]. Has this event been stored using an EventAdapter?")
case OptionVal.None
val stateClass = if (state == null) "null" else state.getClass.getName
throw new MatchError(s"No match found for event [${event.getClass}] and state [$stateClass]. Has this event been stored using an EventAdapter?")
case OptionVal.Some(s) s
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed.javadsl;

import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.util.Objects;

public class NullEmptyStateTest extends JUnitSuite {

private static final Config config = ConfigFactory.parseString(
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");

@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);

static class NullEmptyState extends PersistentBehavior<String, String, String> {

private final ActorRef<String> probe;

NullEmptyState(PersistenceId persistenceId, ActorRef<String> probe) {
super(persistenceId);
this.probe = probe;
}

@Override
public String emptyState() {
return null;
}

@Override
public void onRecoveryCompleted(String s) {
probe.tell("onRecoveryCompleted:" + s);
}

@Override
public CommandHandler<String, String, String> commandHandler() {
CommandHandlerBuilder<String, String, String, String> b1 =
commandHandlerBuilder(Objects::isNull)
.matchCommand("stop"::equals, command -> Effect().stop())
.matchCommand(String.class, this::persistCommand);

CommandHandlerBuilder<String, String, String, String> b2 =
commandHandlerBuilder(String.class)
.matchCommand("stop"::equals, command -> Effect().stop())
.matchCommand(String.class, this::persistCommand);

return b1.orElse(b2).build();
}

private Effect<String, String> persistCommand(String command) {
return Effect().persist(command);
}

@Override
public EventHandler<String, String> eventHandler() {
return eventHandlerBuilder()
.matchEvent(String.class, this::applyEvent)
.build();
}

private String applyEvent(String state, String event) {
probe.tell("eventHandler:" + state + ":" + event);
if (state == null)
return event;
else
return state + event;
}
}

@Test
public void handleNullState() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<String> b = Behaviors.setup(ctx -> new NullEmptyState(new PersistenceId("a"), probe.ref()));

ActorRef<String> ref1 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:null");
ref1.tell("stop");

ActorRef<String> ref2 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:null");
ref2.tell("one");
probe.expectMessage("eventHandler:null:one");
ref2.tell("two");
probe.expectMessage("eventHandler:one:two");

ref2.tell("stop");
ActorRef<String> ref3 = testKit.spawn(b);
// eventHandler from reply
probe.expectMessage("eventHandler:null:one");
probe.expectMessage("eventHandler:one:two");
probe.expectMessage("onRecoveryCompleted:onetwo");
ref3.tell("three");
probe.expectMessage("eventHandler:onetwo:three");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed.javadsl;

import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

public class PrimitiveStateTest extends JUnitSuite {

private static final Config config = ConfigFactory.parseString(
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");

@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);

static class PrimitiveState extends PersistentBehavior<Integer, Integer, Integer> {

private final ActorRef<String> probe;

PrimitiveState(PersistenceId persistenceId, ActorRef<String> probe) {
super(persistenceId);
this.probe = probe;
}

@Override
public Integer emptyState() {
return 0;
}

@Override
public void onRecoveryCompleted(Integer n) {
probe.tell("onRecoveryCompleted:" + n);
}

@Override
public CommandHandler<Integer, Integer, Integer> commandHandler() {
return (state, command) -> {
if (command < 0)
return Effect().stop();
else
return Effect().persist(command);
};
}

@Override
public EventHandler<Integer, Integer> eventHandler() {
return (state, event) -> {
probe.tell("eventHandler:" + state + ":" + event);
return state + event;
};
}
}

@Test
public void handleIntegerState() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<Integer> b = Behaviors.setup(ctx -> new PrimitiveState(new PersistenceId("a"), probe.ref()));
ActorRef<Integer> ref1 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:0");
ref1.tell(1);
probe.expectMessage("eventHandler:0:1");
ref1.tell(2);
probe.expectMessage("eventHandler:1:2");

ref1.tell(-1);
ActorRef<Integer> ref2 = testKit.spawn(b);
// eventHandler from reply
probe.expectMessage("eventHandler:0:1");
probe.expectMessage("eventHandler:1:2");
probe.expectMessage("onRecoveryCompleted:3");
ref2.tell(3);
probe.expectMessage("eventHandler:3:3");
}
}
Loading

0 comments on commit bed17cc

Please sign in to comment.