diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index ec7d0cd0949..b861ed19eea 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -4,6 +4,9 @@ package akka.cluster.sharding.typed.scaladsl +import scala.concurrent.Future + +import akka.Done import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef import akka.actor.typed.Behavior @@ -13,6 +16,7 @@ import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory import org.scalatest.{ WordSpec, WordSpecLike } @@ -41,6 +45,7 @@ object ClusterShardingPersistenceSpec { sealed trait Command final case class Add(s: String) extends Command + final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done] final case class Get(replyTo: ActorRef[String]) extends Command final case object StopPlz extends Command @@ -51,7 +56,13 @@ object ClusterShardingPersistenceSpec { persistenceId = typeKey.persistenceIdFrom(entityId), emptyState = "", commandHandler = (state, cmd) ⇒ cmd match { - case Add(s) ⇒ Effect.persist(s) + case Add(s) ⇒ + Effect.persist(s) + + case cmd @ AddWithConfirmation(s) ⇒ + Effect.persist(s) + .thenReply(cmd)(newState ⇒ Done) + case Get(replyTo) ⇒ replyTo ! s"$entityId:$state" Effect.none @@ -64,19 +75,17 @@ object ClusterShardingPersistenceSpec { class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike { import ClusterShardingPersistenceSpec._ - val sharding = ClusterSharding(system) - "Typed cluster sharding with persistent actor" must { + ClusterSharding(system).start(ShardedEntity( + entityId ⇒ persistentActor(entityId), + typeKey, + StopPlz + )) + Cluster(system).manager ! Join(Cluster(system).selfMember.address) "start persistent actor" in { - ClusterSharding(system).start(ShardedEntity( - entityId ⇒ persistentActor(entityId), - typeKey, - StopPlz - )) - val p = TestProbe[String]() val ref = ClusterSharding(system).entityRefFor(typeKey, "123") @@ -86,5 +95,19 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh ref ! Get(p.ref) p.expectMessage("123:a|b|c") } + + "support ask with thenReply" in { + val p = TestProbe[String]() + + val ref = ClusterSharding(system).entityRefFor(typeKey, "456") + val done1 = ref ? AddWithConfirmation("a") + done1.futureValue should ===(Done) + + val done2: Future[Done] = ref ? AddWithConfirmation("b") + done2.futureValue should ===(Done) + + ref ! Get(p.ref) + p.expectMessage("456:a|b") + } } } diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 3707721f074..0e229939f17 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -248,6 +248,36 @@ Java Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails. The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel. +## Replies + +The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for +persistent actors, because you typically want to know if the command was rejected due to validation errors and +when accepted you want a confirmation when the events have been successfully stored. + +Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] in the +commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can +be sent to the `ActorRef`. + +TODO example of thenRun reply + +Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that +it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`. +If it's defined with @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`] +there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be +created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`], +@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`]. + +These effects will send the reply message even when @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`] +is not used, but then there will be no compilation errors if the reply decision is left out. + +Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific +command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services. + +TODO example of thenReply + +When using the reply effect the commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] +in a standardized way. + ## Serialization The same @ref:[serialization](../serialization.md) mechanism as for untyped diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala new file mode 100644 index 00000000000..350e21e7ade --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.actor.typed.ActorRef + +/** + * Commands may implement this trait to facilitate sending reply messages via `Effect.thenReply`. + * + * @tparam ReplyMessage The type of the reply message + */ +trait ExpectingReply[ReplyMessage] { + def replyTo: ActorRef[ReplyMessage] +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala index 40fecd3243d..b64a15f2b1b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SideEffect.scala @@ -4,6 +4,7 @@ package akka.persistence.typed +import akka.actor.typed.ActorRef import akka.japi.function import akka.annotation.{ DoNotInherit, InternalApi } @@ -19,7 +20,23 @@ sealed abstract class SideEffect[State] /** INTERNAL API */ @InternalApi -final private[akka] case class Callback[State](effect: State ⇒ Unit) extends SideEffect[State] +private[akka] class Callback[State](val sideEffect: State ⇒ Unit) extends SideEffect[State] { + override def toString: String = "Callback" +} + +/** INTERNAL API */ +@InternalApi +final private[akka] class ReplyEffectImpl[ReplyMessage, State](replyTo: ActorRef[ReplyMessage], replyWithMessage: State ⇒ ReplyMessage) + extends Callback[State](state ⇒ replyTo ! replyWithMessage(state)) { + override def toString: String = "Reply" +} + +/** INTERNAL API */ +@InternalApi +final private[akka] class NoReplyEffectImpl[State] + extends Callback[State](_ ⇒ ()) { + override def toString: String = "NoReply" +} /** INTERNAL API */ @InternalApi @@ -30,7 +47,7 @@ object SideEffect { * Create a ChainedEffect that can be run after Effects */ def apply[State](callback: State ⇒ Unit): SideEffect[State] = - Callback(callback) + new Callback(callback) /** * Java API @@ -38,7 +55,7 @@ object SideEffect { * Create a ChainedEffect that can be run after Effects */ def create[State](callback: function.Procedure[State]): SideEffect[State] = - Callback(s ⇒ callback.apply(s)) + new Callback(s ⇒ callback.apply(s)) def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]] } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index 4358db72f3c..d6dc57727a6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -4,37 +4,41 @@ package akka.persistence.typed.internal +import scala.collection.immutable + import akka.persistence.typed.{ SideEffect, javadsl, scaladsl } -import scala.collection.{ immutable ⇒ im } import akka.annotation.InternalApi -import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.NoReplyEffectImpl /** INTERNAL API */ @InternalApi -private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] { +private[akka] abstract class EffectImpl[+Event, State] extends javadsl.ReplyEffect[Event, State] with scaladsl.ReplyEffect[Event, State] { /* All events that will be persisted in this effect */ - override def events: im.Seq[Event] = Nil + override def events: immutable.Seq[Event] = Nil override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] = CompositeEffect(this, chainedEffect) + override def thenNoReply(): EffectImpl[Event, State] = + CompositeEffect(this, new NoReplyEffectImpl[State]) + } /** INTERNAL API */ @InternalApi private[akka] object CompositeEffect { - def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] = + def apply[Event, State](effect: scaladsl.Effect[Event, State], sideEffects: SideEffect[State]): CompositeEffect[Event, State] = CompositeEffect[Event, State](effect, sideEffects :: Nil) } /** INTERNAL API */ @InternalApi private[akka] final case class CompositeEffect[Event, State]( - persistingEffect: Effect[Event, State], - _sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] { + persistingEffect: scaladsl.Effect[Event, State], + _sideEffects: immutable.Seq[SideEffect[State]]) extends EffectImpl[Event, State] { - override val events = persistingEffect.events + override val events: immutable.Seq[Event] = persistingEffect.events override def toString: String = s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})" @@ -52,7 +56,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[ /** INTERNAL API */ @InternalApi -private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State] +private[akka] case class PersistAll[Event, State](override val events: immutable.Seq[Event]) extends EffectImpl[Event, State] /** INTERNAL API */ @InternalApi diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index bf0b31e123b..d8828529e93 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -292,8 +292,8 @@ private[akka] object EventsourcedRunning { case _: Stop.type @unchecked ⇒ Behaviors.stopped - case Callback(sideEffects) ⇒ - sideEffects(state.state) + case callback: Callback[_] ⇒ + callback.sideEffect(state.state) Behaviors.same case _ ⇒ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index 751d0087fd5..459a373df5f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -8,9 +8,10 @@ import akka.annotation.DoNotInherit import akka.japi.function import akka.persistence.typed.internal._ import akka.persistence.typed.{ SideEffect, Stop } - import scala.collection.JavaConverters._ +import akka.persistence.typed.ExpectingReply + object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] @DoNotInherit sealed class EffectFactories[Command, Event, State] { @@ -40,6 +41,31 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] * This command is not handled, but it is not an error that it isn't. */ def unhandled: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]] + + /** + * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the + * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * + * This has the same semantics as `cmd.replyTo.tell`. + * + * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten + * when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When + * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. + * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help + * finding mistakes. + */ + def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = + none.thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] { + override def apply(param: State): ReplyMessage = replyWithMessage + }) + + /** + * When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect + * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be + * sent for a specific command or the reply will be sent later. + */ + def noReply(): ReplyEffect[Event, State] = + none.thenNoReply() } /** @@ -66,4 +92,35 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] final def thenStop(): Effect[Event, State] = CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]]) + /** + * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the + * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * + * This has the same semantics as `cmd.replyTo().tell`. + * + * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten + * when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When + * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. + * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help + * finding mistakes. + */ + def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: function.Function[State, ReplyMessage]): ReplyEffect[Event, State] = + CompositeEffect(this, SideEffect[State](newState ⇒ cmd.replyTo ! replyWithMessage(newState))) + + /** + * When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect + * isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be + * sent for a specific command or the reply will be sent later. + */ + def thenNoReply(): ReplyEffect[Event, State] + +} + +/** + * [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten. + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. + */ +@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] { + self: EffectImpl[Event, State] ⇒ } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index a763eaa7dd1..40d2f5a33aa 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -16,16 +16,15 @@ import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.internal._ import scala.util.{ Failure, Success } -/** Java API */ @ApiMayChange -abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { +abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { def this(persistenceId: PersistenceId) = { - this(persistenceId, None) + this(persistenceId, Optional.empty[BackoffSupervisorStrategy]) } def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { - this(persistenceId, Some(backoffSupervisorStrategy)) + this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy)) } /** @@ -169,7 +168,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe }) }).eventAdapter(eventAdapter()) - if (supervisorStrategy.isDefined) + if (supervisorStrategy.isPresent) behavior.onPersistFailure(supervisorStrategy.get) else behavior @@ -177,3 +176,25 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe } +/** + * FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet. + * + * A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten. + * There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. + */ +@ApiMayChange +abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy]) + extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) { + + def this(persistenceId: PersistenceId) = { + this(persistenceId, Optional.empty[BackoffSupervisorStrategy]) + } + + def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { + this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy)) + } + + // FIXME override commandHandler and commandHandlerBuilder to require the ReplyEffect return type, + // which is unfortunately intrusive to the CommandHandlerBuilder +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala index c636f0f346b..f6af1837541 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -4,13 +4,14 @@ package akka.persistence.typed.scaladsl -import akka.japi.function import akka.annotation.DoNotInherit import akka.persistence.typed.{ SideEffect, Stop } import akka.persistence.typed.internal._ - import scala.collection.{ immutable ⇒ im } +import akka.persistence.typed.ExpectingReply +import akka.persistence.typed.ReplyEffectImpl + /** * Factories for effects - how a persistent actor reacts on a command */ @@ -58,6 +59,30 @@ object Effect { * Side effects can be chained with `andThen` */ def stop[Event, State]: Effect[Event, State] = none.andThenStop() + + /** + * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the + * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * + * This has the same semantics as `cmd.replyTo.tell`. + * + * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten + * when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When + * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. + * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help + * finding mistakes. + */ + def reply[ReplyMessage, Event, State](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = + none[Event, State].thenReply[ReplyMessage](cmd)(_ ⇒ replyWithMessage) + + /** + * When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect + * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be + * sent for a specific command or the reply will be sent later. + */ + def noReply[Event, State]: ReplyEffect[Event, State] = + none.thenNoReply() + } /** @@ -91,5 +116,37 @@ trait Effect[+Event, State] { def andThenStop(): Effect[Event, State] = { CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]]) } + + /** + * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the + * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * + * This has the same semantics as `cmd.replyTo.tell`. + * + * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten + * when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When + * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. + * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help + * finding mistakes. + */ + def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: State ⇒ ReplyMessage): ReplyEffect[Event, State] = + CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage)) + + /** + * When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect + * isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be + * sent for a specific command or the reply will be sent later. + */ + def thenNoReply(): ReplyEffect[Event, State] + } +/** + * [[PersistentBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten. + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]]. + * + * Not intended for user extension. + */ +@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala index 71ee26b629c..aa5fc9fe5d8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala @@ -13,6 +13,8 @@ import akka.persistence.typed.EventAdapter import akka.persistence.typed.internal._ import scala.util.Try +import akka.annotation.DoNotInherit +import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId object PersistentBehavior { @@ -45,6 +47,18 @@ object PersistentBehavior { eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) + /** + * Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten. + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]]. + */ + def withEnforcedReplies[Command <: ExpectingReply[_], Event, State]( + persistenceId: PersistenceId, + emptyState: State, + commandHandler: (State, Command) ⇒ ReplyEffect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) + /** * The `CommandHandler` defines how to act on commands. A `CommandHandler` is * a function: @@ -70,7 +84,10 @@ object PersistentBehavior { } -trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] { +/** + * Not intended for user extension. + */ +@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] { /** * The `callback` function is called to notify the actor that the recovery process * is finished. diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 7645312c741..1b0dfa7d37a 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -4,6 +4,7 @@ package akka.persistence.typed.javadsl; +import akka.Done; import akka.actor.typed.*; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.Behaviors; @@ -17,6 +18,7 @@ import akka.persistence.query.Sequence; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.persistence.typed.EventAdapter; +import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.NoOpEventAdapter; import akka.persistence.typed.PersistenceId; import akka.stream.ActorMaterializer; @@ -85,6 +87,20 @@ static class EmptyEventsListAndThenLog implements Command { static class IncrementTwiceAndLog implements Command { } + public static class IncrementWithConfirmation implements Command, ExpectingReply { + + private final ActorRef replyTo; + + public IncrementWithConfirmation(ActorRef replyTo) { + this.replyTo = replyTo; + } + + @Override + public ActorRef replyTo() { + return replyTo; + } + } + static class StopThenLog implements Command { } @@ -251,7 +267,11 @@ private static Behavior counter( @Override public CommandHandler commandHandler() { return commandHandlerBuilder(State.class) - .matchCommand(Increment.class, (state, command) -> Effect().persist(new Incremented(1))) + .matchCommand(Increment.class, (state, command) -> + Effect().persist(new Incremented(1))) + .matchCommand(IncrementWithConfirmation.class, (state, command) -> + Effect().persist(new Incremented(1)) + .thenReply(command, newState -> Done.getInstance())) .matchCommand(GetValue.class, (state, command) -> { command.replyTo.tell(state); return Effect().none(); @@ -361,6 +381,14 @@ public void replyStoredEvents() { probe.expectMessage(new State(4, Arrays.asList(0, 1, 2, 3))); } + @Test + public void thenReplyEffect() { + ActorRef c = testKit.spawn(counter(new PersistenceId("c1b"))); + TestProbe probe = testKit.createTestProbe(); + c.tell(new IncrementWithConfirmation(probe.ref())); + probe.expectMessage(Done.getInstance()); + } + @Test public void handleTerminatedSignal() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala new file mode 100644 index 00000000000..6bc43f19500 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ +import scala.util.Success +import scala.util.Try + +import akka.Done +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.Terminated +import akka.actor.typed.scaladsl.ActorContext +import akka.persistence.journal.inmem.InmemJournal +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.query.EventEnvelope +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.Sequence +import akka.persistence.typed.ExpectingReply +import akka.persistence.typed.PersistenceId +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object PersistentBehaviorReplySpec { + def conf: Config = ConfigFactory.parseString( + s""" + akka.loglevel = INFO + # akka.persistence.typed.log-stashing = INFO + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + """) + + sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage] + final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command[Done] + final case class IncrementReplyLater(override val replyTo: ActorRef[Done]) extends Command[Done] + final case class ReplyNow(override val replyTo: ActorRef[Done]) extends Command[Done] + final case class GetValue(replyTo: ActorRef[State]) extends Command[State] + + sealed trait Event + final case class Incremented(delta: Int) extends Event + + final case class State(value: Int, history: Vector[Int]) + + def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command[_]] = + Behaviors.setup(ctx ⇒ counter(ctx, persistenceId)) + + def counter( + ctx: ActorContext[Command[_]], + persistenceId: PersistenceId): PersistentBehavior[Command[_], Event, State] = { + PersistentBehavior.withEnforcedReplies[Command[_], Event, State]( + persistenceId, + emptyState = State(0, Vector.empty), + commandHandler = (state, cmd) ⇒ cmd match { + + case cmd: IncrementWithConfirmation ⇒ + Effect.persist(Incremented(1)) + .thenReply(cmd)(_ ⇒ Done) + + case cmd: IncrementReplyLater ⇒ + Effect.persist(Incremented(1)) + .thenRun((_: State) ⇒ ctx.self ! ReplyNow(cmd.replyTo)) + .thenNoReply() + + case cmd: ReplyNow ⇒ + Effect.reply(cmd)(Done) + + case query: GetValue ⇒ + Effect.reply(query)(state) + + }, + eventHandler = (state, evt) ⇒ evt match { + case Incremented(delta) ⇒ + State(state.value + delta, state.history :+ state.value) + }) + } +} + +class PersistentBehaviorReplySpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike { + + import PersistentBehaviorReplySpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + "A typed persistent actor with commands that are expecting replies" must { + + "persist an event thenReply" in { + val c = spawn(counter(nextPid())) + val probe = TestProbe[Done] + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + + c ! IncrementWithConfirmation(probe.ref) + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + } + + "persist an event thenReply later" in { + val c = spawn(counter(nextPid())) + val probe = TestProbe[Done] + c ! IncrementReplyLater(probe.ref) + probe.expectMessage(Done) + } + + "reply to query command" in { + val c = spawn(counter(nextPid())) + val updateProbe = TestProbe[Done] + c ! IncrementWithConfirmation(updateProbe.ref) + + val queryProbe = TestProbe[State] + c ! GetValue(queryProbe.ref) + queryProbe.expectMessage(State(1, Vector(0))) + } + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 8e2a562154c..8fb87fc2957 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -26,6 +26,7 @@ import scala.concurrent.duration._ import scala.util.{ Success, Try } import akka.persistence.journal.inmem.InmemJournal +import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike @@ -81,6 +82,7 @@ object PersistentBehaviorSpec { final case object IncrementLater extends Command final case object IncrementAfterReceiveTimeout extends Command final case object IncrementTwiceAndThenLog extends Command + final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done] final case object DoNothingAndThenLog extends Command final case object EmptyEventsListAndThenLog extends Command final case class GetValue(replyTo: ActorRef[State]) extends Command @@ -149,6 +151,10 @@ object PersistentBehaviorSpec { case IncrementWithPersistAll(n) ⇒ Effect.persist((0 until n).map(_ ⇒ Incremented(1))) + case cmd: IncrementWithConfirmation ⇒ + Effect.persist(Incremented(1)) + .thenReply(cmd)(newState ⇒ Done) + case GetValue(replyTo) ⇒ replyTo ! state Effect.none @@ -327,6 +333,18 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio } + "persist an event thenReply" in { + val c = spawn(counter(nextPid)) + val probe = TestProbe[Done] + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + + c ! IncrementWithConfirmation(probe.ref) + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + } + /** Proves that side-effects are called when emitting an empty list of events */ "chainable side effects without events" in { val loggingProbe = TestProbe[String]