Skip to content

Commit

Permalink
Merge pull request akka#25692 from akka/wip-25482-then-reply-patriknw
Browse files Browse the repository at this point in the history
thenReply Effect, akka#25482
  • Loading branch information
patriknw authored Oct 17, 2018
2 parents 49f6622 + 81c7adf commit 58ec80d
Show file tree
Hide file tree
Showing 13 changed files with 452 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package akka.cluster.sharding.typed.scaladsl

import scala.concurrent.Future

import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
Expand All @@ -13,6 +16,7 @@ import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.{ WordSpec, WordSpecLike }
Expand Down Expand Up @@ -41,6 +45,7 @@ object ClusterShardingPersistenceSpec {

sealed trait Command
final case class Add(s: String) extends Command
final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case class Get(replyTo: ActorRef[String]) extends Command
final case object StopPlz extends Command

Expand All @@ -51,7 +56,13 @@ object ClusterShardingPersistenceSpec {
persistenceId = typeKey.persistenceIdFrom(entityId),
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s) Effect.persist(s)
case Add(s)
Effect.persist(s)

case cmd @ AddWithConfirmation(s)
Effect.persist(s)
.thenReply(cmd)(newState Done)

case Get(replyTo)
replyTo ! s"$entityId:$state"
Effect.none
Expand All @@ -64,19 +75,17 @@ object ClusterShardingPersistenceSpec {
class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike {
import ClusterShardingPersistenceSpec._

val sharding = ClusterSharding(system)

"Typed cluster sharding with persistent actor" must {

ClusterSharding(system).start(ShardedEntity(
entityId persistentActor(entityId),
typeKey,
StopPlz
))

Cluster(system).manager ! Join(Cluster(system).selfMember.address)

"start persistent actor" in {
ClusterSharding(system).start(ShardedEntity(
entityId persistentActor(entityId),
typeKey,
StopPlz
))

val p = TestProbe[String]()

val ref = ClusterSharding(system).entityRefFor(typeKey, "123")
Expand All @@ -86,5 +95,19 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
ref ! Get(p.ref)
p.expectMessage("123:a|b|c")
}

"support ask with thenReply" in {
val p = TestProbe[String]()

val ref = ClusterSharding(system).entityRefFor(typeKey, "456")
val done1 = ref ? AddWithConfirmation("a")
done1.futureValue should ===(Done)

val done2: Future[Done] = ref ? AddWithConfirmation("b")
done2.futureValue should ===(Done)

ref ! Get(p.ref)
p.expectMessage("456:a|b")
}
}
}
30 changes: 30 additions & 0 deletions akka-docs/src/main/paradox/typed/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,36 @@ Java
Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails.
The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel.

## Replies

The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for
persistent actors, because you typically want to know if the command was rejected due to validation errors and
when accepted you want a confirmation when the events have been successfully stored.

Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`] in the
commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
be sent to the `ActorRef`.

TODO example of thenRun reply

Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`.
If it's defined with @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be
created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`],
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].

These effects will send the reply message even when @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
is not used, but then there will be no compilation errors if the reply decision is left out.

Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.

TODO example of thenReply

When using the reply effect the commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]
in a standardized way.

## Serialization

The same @ref:[serialization](../serialization.md) mechanism as for untyped
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed

import akka.actor.typed.ActorRef

/**
* Commands may implement this trait to facilitate sending reply messages via `Effect.thenReply`.
*
* @tparam ReplyMessage The type of the reply message
*/
trait ExpectingReply[ReplyMessage] {
def replyTo: ActorRef[ReplyMessage]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.typed

import akka.actor.typed.ActorRef
import akka.japi.function
import akka.annotation.{ DoNotInherit, InternalApi }

Expand All @@ -19,7 +20,23 @@ sealed abstract class SideEffect[State]

/** INTERNAL API */
@InternalApi
final private[akka] case class Callback[State](effect: State Unit) extends SideEffect[State]
private[akka] class Callback[State](val sideEffect: State Unit) extends SideEffect[State] {
override def toString: String = "Callback"
}

/** INTERNAL API */
@InternalApi
final private[akka] class ReplyEffectImpl[ReplyMessage, State](replyTo: ActorRef[ReplyMessage], replyWithMessage: State ReplyMessage)
extends Callback[State](state replyTo ! replyWithMessage(state)) {
override def toString: String = "Reply"
}

/** INTERNAL API */
@InternalApi
final private[akka] class NoReplyEffectImpl[State]
extends Callback[State](_ ()) {
override def toString: String = "NoReply"
}

/** INTERNAL API */
@InternalApi
Expand All @@ -30,15 +47,15 @@ object SideEffect {
* Create a ChainedEffect that can be run after Effects
*/
def apply[State](callback: State Unit): SideEffect[State] =
Callback(callback)
new Callback(callback)

/**
* Java API
*
* Create a ChainedEffect that can be run after Effects
*/
def create[State](callback: function.Procedure[State]): SideEffect[State] =
Callback(s callback.apply(s))
new Callback(s callback.apply(s))

def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,41 @@

package akka.persistence.typed.internal

import scala.collection.immutable

import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }

import scala.collection.{ immutable im }
import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.NoReplyEffectImpl

/** INTERNAL API */
@InternalApi
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] {
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.ReplyEffect[Event, State] with scaladsl.ReplyEffect[Event, State] {
/* All events that will be persisted in this effect */
override def events: im.Seq[Event] = Nil
override def events: immutable.Seq[Event] = Nil

override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] =
CompositeEffect(this, chainedEffect)

override def thenNoReply(): EffectImpl[Event, State] =
CompositeEffect(this, new NoReplyEffectImpl[State])

}

/** INTERNAL API */
@InternalApi
private[akka] object CompositeEffect {
def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] =
def apply[Event, State](effect: scaladsl.Effect[Event, State], sideEffects: SideEffect[State]): CompositeEffect[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil)
}

/** INTERNAL API */
@InternalApi
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: Effect[Event, State],
_sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
persistingEffect: scaladsl.Effect[Event, State],
_sideEffects: immutable.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {

override val events = persistingEffect.events
override val events: immutable.Seq[Event] = persistingEffect.events

override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
Expand All @@ -52,7 +56,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[

/** INTERNAL API */
@InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State]
private[akka] case class PersistAll[Event, State](override val events: immutable.Seq[Event]) extends EffectImpl[Event, State]

/** INTERNAL API */
@InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ private[akka] object EventsourcedRunning {
case _: Stop.type @unchecked
Behaviors.stopped

case Callback(sideEffects)
sideEffects(state.state)
case callback: Callback[_]
callback.sideEffect(state.state)
Behaviors.same

case _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import akka.annotation.DoNotInherit
import akka.japi.function
import akka.persistence.typed.internal._
import akka.persistence.typed.{ SideEffect, Stop }

import scala.collection.JavaConverters._

import akka.persistence.typed.ExpectingReply

object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]

@DoNotInherit sealed class EffectFactories[Command, Event, State] {
Expand Down Expand Up @@ -40,6 +41,31 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]

/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
none.thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] {
override def apply(param: State): ReplyMessage = replyWithMessage
})

/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def noReply(): ReplyEffect[Event, State] =
none.thenNoReply()
}

/**
Expand All @@ -66,4 +92,35 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
final def thenStop(): Effect[Event, State] =
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])

/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo().tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: function.Function[State, ReplyMessage]): ReplyEffect[Event, State] =
CompositeEffect(this, SideEffect[State](newState cmd.replyTo ! replyWithMessage(newState)))

/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def thenNoReply(): ReplyEffect[Event, State]

}

/**
* [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }

/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {

def this(persistenceId: PersistenceId) = {
this(persistenceId, None)
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}

def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Some(backoffSupervisorStrategy))
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}

/**
Expand Down Expand Up @@ -169,11 +168,33 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
})
}).eventAdapter(eventAdapter())

if (supervisorStrategy.isDefined)
if (supervisorStrategy.isPresent)
behavior.onPersistFailure(supervisorStrategy.get)
else
behavior
}

}

/**
* FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet.
*
* A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@ApiMayChange
abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) {

def this(persistenceId: PersistenceId) = {
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}

def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}

// FIXME override commandHandler and commandHandlerBuilder to require the ReplyEffect return type,
// which is unfortunately intrusive to the CommandHandlerBuilder
}
Loading

0 comments on commit 58ec80d

Please sign in to comment.