From 2672bd7a9524c5d59a322ca452d4ef28ef87164e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2018 09:18:15 +0200 Subject: [PATCH] PersistentEntity to glue together Sharding and PersistentBehavior better * Makes the combination more visable * You don't have to worry about the persistenceId, only EntityTypeKey and entityId * The glue is stronger in the javadsl because of two reasons * Couldn't realisticly create a PersistentEntity class extending PersistenBehavior (which contains all the optional parameters and functions) since that would duplicate too much. * The ActorContext would be needed in the ShardedEntityContext parameter and because of the additional M type parameters the type inference breaks down when using the factory. Would require specifying the type of the ShardedEntityContex[M] parameter. That problem doesn't seem to exist in Java. renamed: s/ShardedEntityContext/EntityContext/ s/ShardedEntity/Entity/ --- .../typed/internal/ClusterShardingImpl.scala | 38 ++-- .../typed/javadsl/ClusterSharding.scala | 103 +++++++---- .../typed/javadsl/PersistentEntity.scala | 37 ++++ .../typed/scaladsl/ClusterSharding.scala | 75 ++++---- .../typed/scaladsl/PersistentEntity.scala | 30 ++++ .../typed/MultiDcClusterShardingSpec.scala | 10 +- .../HelloWorldPersistentEntityExample.java | 163 ++++++++++++++++++ ...HelloWorldPersistentEntityExampleTest.java | 69 ++++++++ .../typed/ShardingCompileOnlyTest.java | 14 +- .../ClusterShardingPersistenceSpec.scala | 32 ++-- .../typed/scaladsl/ClusterShardingSpec.scala | 28 +-- .../HelloWorldPersistentEntityExample.scala | 100 +++++++++++ ...elloWorldPersistentEntityExampleSpec.scala | 57 ++++++ .../typed/ShardingCompileOnlySpec.scala | 17 +- .../main/paradox/typed/cluster-sharding.md | 30 +++- .../paradox/typed/interaction-patterns.md | 4 +- .../src/main/paradox/typed/persistence.md | 15 +- .../persistence/typed/javadsl/Effect.scala | 5 + .../typed/scaladsl/PersistentBehavior.scala | 8 +- 19 files changed, 670 insertions(+), 165 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala create mode 100644 akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java create mode 100644 akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java create mode 100644 akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala create mode 100644 akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index e2d35590b8e..962eaee1a71 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -30,6 +30,7 @@ import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } +import akka.cluster.sharding.typed.scaladsl.EntityContext import akka.cluster.typed.Cluster import akka.event.Logging import akka.event.LoggingAdapter @@ -130,37 +131,40 @@ import akka.util.Timeout private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap // scaladsl impl - override def start[M, E](shardedEntity: scaladsl.ShardedEntity[M, E]): ActorRef[E] = { - val settings = shardedEntity.settings match { + override def start[M, E](entity: scaladsl.Entity[M, E]): ActorRef[E] = { + val settings = entity.settings match { case None ⇒ ClusterShardingSettings(system) case Some(s) ⇒ s } - val extractor = (shardedEntity.messageExtractor match { + val extractor = (entity.messageExtractor match { case None ⇒ new HashCodeMessageExtractor[M](settings.numberOfShards) case Some(e) ⇒ e }).asInstanceOf[ShardingMessageExtractor[E, M]] - internalStart(shardedEntity.create, shardedEntity.entityProps, shardedEntity.typeKey, - shardedEntity.stopMessage, settings, extractor, shardedEntity.allocationStrategy) + internalStart(entity.createBehavior, entity.entityProps, entity.typeKey, + entity.stopMessage, settings, extractor, entity.allocationStrategy) } // javadsl impl - override def start[M, E](shardedEntity: javadsl.ShardedEntity[M, E]): ActorRef[E] = { + override def start[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = { import scala.compat.java8.OptionConverters._ - start(new scaladsl.ShardedEntity( - create = (shard, entitityId) ⇒ shardedEntity.createBehavior.apply(shard, entitityId), - typeKey = shardedEntity.typeKey.asScala, - stopMessage = shardedEntity.stopMessage, - entityProps = shardedEntity.entityProps, - settings = shardedEntity.settings.asScala, - messageExtractor = shardedEntity.messageExtractor.asScala, - allocationStrategy = shardedEntity.allocationStrategy.asScala + start(new scaladsl.Entity( + createBehavior = (ctx: EntityContext) ⇒ Behaviors.setup[M] { actorContext ⇒ + entity.createBehavior( + new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava)) + }, + typeKey = entity.typeKey.asScala, + stopMessage = entity.stopMessage, + entityProps = entity.entityProps, + settings = entity.settings.asScala, + messageExtractor = entity.messageExtractor.asScala, + allocationStrategy = entity.allocationStrategy.asScala )) } private def internalStart[M, E]( - behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[M], + behavior: EntityContext ⇒ Behavior[M], entityProps: Props, typeKey: scaladsl.EntityTypeKey[M], stopMessage: M, @@ -185,7 +189,7 @@ import akka.util.Timeout if (settings.shouldHostShard(cluster)) { log.info("Starting Shard Region [{}]...", typeKey.name) - val shardCommandDelegator = + val shardCommandDelegator: ActorRef[scaladsl.ClusterSharding.ShardCommand] = shardCommandActors.computeIfAbsent( typeKey.name, new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] { @@ -198,7 +202,7 @@ import akka.util.Timeout }) val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ - PropsAdapter(behavior(shardCommandDelegator, entityId), entityProps) + PropsAdapter(behavior(new EntityContext(entityId, shardCommandDelegator)), entityProps) } untypedSharding.internalStart( diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 6f3d98568b4..d98a07fa607 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -15,6 +15,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.RecipientRef import akka.actor.typed.Props import akka.actor.typed.internal.InternalRecipientRef +import akka.actor.typed.javadsl.ActorContext import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy @@ -167,7 +168,7 @@ object ClusterSharding { abstract class ClusterSharding { /** - * Initialize sharding for the given `shardedEntity` factory settings. + * Initialize sharding for the given `entity` factory settings. * * It will start a shard region or a proxy depending on if the settings require role and if this node has * such a role. @@ -175,7 +176,7 @@ abstract class ClusterSharding { * @tparam M The type of message the entity accepts * @tparam E A possible envelope around the message the entity accepts */ - def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] + def start[M, E](entity: Entity[M, E]): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. @@ -195,49 +196,62 @@ abstract class ClusterSharding { def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy } -object ShardedEntity { +object Entity { /** * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional - * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * settings can be defined using the `with` methods of the returned [[Entity]]. + * + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors + * is very common and therefore the [[Entity.ofPersistentEntity]] is provided as convenience. * - * @param createBehavior Create the behavior for an entity given an entityId * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId) * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. - * * @tparam M The type of message the entity accepts */ - def create[M]( - createBehavior: JFunction[String, Behavior[M]], + def of[M]( typeKey: EntityTypeKey[M], - stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = { - create(new BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] { - override def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] = - createBehavior.apply(entityId) - }, typeKey, stopMessage) + createBehavior: JFunction[EntityContext[M], Behavior[M]], + stopMessage: M): Entity[M, ShardingEnvelope[M]] = { + new Entity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty()) } /** - * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional - * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#start]]. Any [[Behavior]] can + * be used as a sharded entity actor, but the combination of sharding and persistent actors is very common + * and therefore this factory is provided as convenience. + * + * More optional settings can be defined using the `with` methods of the returned [[Entity]]. * - * @param createBehavior Create the behavior for an entity given `ShardCommand` ref and an entityId * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId) * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. - * @tparam M The type of message the entity accepts + * @tparam Command The type of message the entity accepts */ - def create[M]( - createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], - typeKey: EntityTypeKey[M], - stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = - new ShardedEntity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty()) + def ofPersistentEntity[Command, Event, State >: Null]( + typeKey: EntityTypeKey[Command], + createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]], + stopMessage: Command): Entity[Command, ShardingEnvelope[Command]] = { + + of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] { + override def apply(ctx: EntityContext[Command]): Behavior[Command] = { + val persistentEntity = createPersistentEntity(ctx) + if (persistentEntity.entityTypeKey != typeKey) + throw new IllegalArgumentException(s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " + + s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.") + persistentEntity + } + }, stopMessage) + } + } /** * Defines how the entity should be created. Used in [[ClusterSharding#start]]. */ -final class ShardedEntity[M, E] private[akka] ( - val createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], +final class Entity[M, E] private[akka] ( + val createBehavior: JFunction[EntityContext[M], Behavior[M]], val typeKey: EntityTypeKey[M], val stopMessage: M, val entityProps: Props, @@ -248,13 +262,13 @@ final class ShardedEntity[M, E] private[akka] ( /** * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. */ - def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = + def withEntityProps(newEntityProps: Props): Entity[M, E] = copy(entityProps = newEntityProps) /** * Additional settings, typically loaded from configuration. */ - def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = + def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] = copy(settings = Optional.ofNullable(newSettings)) /** @@ -265,29 +279,44 @@ final class ShardedEntity[M, E] private[akka] ( * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default * is configured with `akka.cluster.sharding.number-of-shards`. */ - def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = - new ShardedEntity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy) + def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] = + new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy) /** * Allocation strategy which decides on which nodes to allocate new shards, * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. */ - def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = + def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] = copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy)) private def copy( - create: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] = createBehavior, - typeKey: EntityTypeKey[M] = typeKey, - stopMessage: M = stopMessage, - entityProps: Props = entityProps, - settings: Optional[ClusterShardingSettings] = settings, - allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy - ): ShardedEntity[M, E] = { - new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) + createBehavior: JFunction[EntityContext[M], Behavior[M]] = createBehavior, + typeKey: EntityTypeKey[M] = typeKey, + stopMessage: M = stopMessage, + entityProps: Props = entityProps, + settings: Optional[ClusterShardingSettings] = settings, + allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy + ): Entity[M, E] = { + new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) } } +/** + * Parameter to [[Entity.of]] + */ +final class EntityContext[M]( + entityId: String, + shard: ActorRef[ClusterSharding.ShardCommand], + actorContext: ActorContext[M]) { + + def getEntityId: String = entityId + + def getShard: ActorRef[ClusterSharding.ShardCommand] = shard + + def getActorContext: ActorContext[M] = actorContext +} + /** Allows starting a specific Sharded Entity by its entity identifier */ object StartEntity { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala new file mode 100644 index 00000000000..cd037829687 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl + +import java.util.Optional + +import scala.compat.java8.OptionConverters._ + +import akka.actor.typed.BackoffSupervisorStrategy +import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.javadsl.PersistentBehavior + +/** + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent + * actors is very common and therefore this `PersistentEntity` class is provided as convenience. + * + * It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId` + * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using + * [[EntityTypeKey.persistenceIdFrom]]. + */ +abstract class PersistentEntity[Command, Event, State >: Null] private ( + val entityTypeKey: EntityTypeKey[Command], + persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) + extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) { + + def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = { + this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy]) + } + + def this(entityTypeKey: EntityTypeKey[Command], entityId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { + this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy)) + } + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index bf1d75395d1..a3ea35645cc 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -19,6 +19,7 @@ import akka.actor.typed.ExtensionSetup import akka.actor.typed.RecipientRef import akka.actor.typed.Props import akka.actor.typed.internal.InternalRecipientRef +import akka.actor.typed.scaladsl.ActorContext import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy @@ -26,6 +27,7 @@ import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.PersistentBehavior object ClusterSharding extends ExtensionId[ClusterSharding] { @@ -168,7 +170,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding ⇒ /** - * Initialize sharding for the given `shardedEntity` factory settings. + * Initialize sharding for the given `entity` factory settings. * * It will start a shard region or a proxy depending on if the settings require role and if this node has * such a role. @@ -176,7 +178,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding * @tparam M The type of message the entity accepts * @tparam E A possible envelope around the message the entity accepts */ - def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] + def start[M, E](entity: Entity[M, E]): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. @@ -201,45 +203,33 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf } -object ShardedEntity { +object Entity { /** * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional - * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * settings can be defined using the `with` methods of the returned [[Entity]]. * - * @param create Create the behavior for an entity given an entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. - * - * @tparam M The type of message the entity accepts - */ - def apply[M]( - create: String ⇒ Behavior[M], - typeKey: EntityTypeKey[M], - stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = - apply((_, entityId) ⇒ create(entityId), typeKey, stopMessage) - - /** - * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional - * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors + * is very common and therefore [[PersistentEntity]] is provided as a convenience for creating such + * [[PersistentBehavior]]. * - * @param create Create the behavior for an entity given `ShardCommand` ref and an entityId * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId) * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * @tparam M The type of message the entity accepts */ def apply[M]( - create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M], - typeKey: EntityTypeKey[M], - stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = - new ShardedEntity(create, typeKey, stopMessage, Props.empty, None, None, None) + typeKey: EntityTypeKey[M], + createBehavior: EntityContext ⇒ Behavior[M], + stopMessage: M): Entity[M, ShardingEnvelope[M]] = + new Entity(createBehavior, typeKey, stopMessage, Props.empty, None, None, None) } /** * Defines how the entity should be created. Used in [[ClusterSharding#start]]. */ -final class ShardedEntity[M, E] private[akka] ( - val create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M], +final class Entity[M, E] private[akka] ( + val createBehavior: EntityContext ⇒ Behavior[M], val typeKey: EntityTypeKey[M], val stopMessage: M, val entityProps: Props, @@ -250,13 +240,13 @@ final class ShardedEntity[M, E] private[akka] ( /** * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. */ - def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = + def withEntityProps(newEntityProps: Props): Entity[M, E] = copy(entityProps = newEntityProps) /** * Additional settings, typically loaded from configuration. */ - def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = + def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] = copy(settings = Option(newSettings)) /** @@ -267,29 +257,36 @@ final class ShardedEntity[M, E] private[akka] ( * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default * is configured with `akka.cluster.sharding.number-of-shards`. */ - def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = - new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy) + def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] = + new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy) /** * Allocation strategy which decides on which nodes to allocate new shards, * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. */ - def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = + def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] = copy(allocationStrategy = Option(newAllocationStrategy)) private def copy( - create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M] = create, - typeKey: EntityTypeKey[M] = typeKey, - stopMessage: M = stopMessage, - entityProps: Props = entityProps, - settings: Option[ClusterShardingSettings] = settings, - allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy - ): ShardedEntity[M, E] = { - new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) + createBehavior: EntityContext ⇒ Behavior[M] = createBehavior, + typeKey: EntityTypeKey[M] = typeKey, + stopMessage: M = stopMessage, + entityProps: Props = entityProps, + settings: Option[ClusterShardingSettings] = settings, + allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy + ): Entity[M, E] = { + new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) } } +/** + * Parameter to [[Entity.apply]] + */ +final class EntityContext( + val entityId: String, + val shard: ActorRef[ClusterSharding.ShardCommand]) + /** Allows starting a specific Sharded Entity by its entity identifier */ object StartEntity { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala new file mode 100644 index 00000000000..1f4e0ef50bd --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.typed.Behavior +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.PersistentBehavior + +object PersistentEntity { + + /** + * Create a `Behavior` for a persistent actor that is used with Cluster Sharding. + * + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent + * actors is very common and therefore this `PersistentEntity` is provided as convenience. + * + * It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId` + * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using + * [[EntityTypeKey.persistenceIdFrom]]. + */ + def apply[Command, Event, State]( + entityTypeKey: EntityTypeKey[Command], + entityId: String, + emptyState: State, + commandHandler: (State, Command) ⇒ Effect[Event, State], + eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = + PersistentBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler) +} diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala index 878983e9573..8fdcdd0bb49 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -7,7 +7,7 @@ package akka.cluster.sharding.typed import akka.actor.typed.{ ActorRef, Props } import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.sharding.typed.scaladsl.ShardedEntity +import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec } import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.actor.testkit.typed.scaladsl.TestProbe @@ -69,9 +69,9 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "start sharding" in { val sharding = ClusterSharding(typedSystem) val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start( - ShardedEntity( - _ ⇒ multiDcPinger, + Entity( typeKey, + _ ⇒ multiDcPinger, NoMore)) val probe = TestProbe[Pong] shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) @@ -99,9 +99,9 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "be able to message cross dc via proxy" in { runOn(first, second) { val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start( - ShardedEntity( - _ ⇒ multiDcPinger, + Entity( typeKey, + _ ⇒ multiDcPinger, NoMore) .withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) val probe = TestProbe[Pong] diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java new file mode 100644 index 00000000000..4694164962d --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.javadsl.ActorContext; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletionStage; + +//#persistent-entity-import +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.PersistentEntity; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; +import akka.persistence.typed.javadsl.EventHandler; +//#persistent-entity-import + +//#persistent-entity-usage-import +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.util.Timeout; +//#persistent-entity-usage-import + +public class HelloWorldPersistentEntityExample { + + //#persistent-entity-usage + + public static class HelloWorldService { + private final ActorSystem system; + private final ClusterSharding sharding; + private final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5)); + + // registration at startup + public HelloWorldService(ActorSystem system) { + this.system = system; + sharding = ClusterSharding.get(system); + + sharding.start( + Entity.ofPersistentEntity( + HelloWorld.ENTITY_TYPE_KEY, + ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()), + HelloWorld.Passivate.INSTANCE)); + } + + // usage example + public CompletionStage sayHello(String worldId, String whom) { + EntityRef entityRef = + sharding.entityRefFor(HelloWorld.ENTITY_TYPE_KEY, worldId); + CompletionStage result = + entityRef.ask(replyTo -> new HelloWorld.Greet(whom, replyTo), askTimeout); + return result.thenApply(greeting -> greeting.numberOfPeople); + } + } + //#persistent-entity-usage + + //#persistent-entity + + public static class HelloWorld extends PersistentEntity { + + // Command + interface Command { + } + + public static final class Greet implements Command { + public final String whom; + public final ActorRef replyTo; + + public Greet(String whom, ActorRef replyTo) { + this.whom = whom; + this.replyTo = replyTo; + } + } + + enum Passivate implements Command { + INSTANCE + } + + // Response + public static final class Greeting { + public final String whom; + public final int numberOfPeople; + + public Greeting(String whom, int numberOfPeople) { + this.whom = whom; + this.numberOfPeople = numberOfPeople; + } + } + + // Event + public static final class Greeted { + public final String whom; + + public Greeted(String whom) { + this.whom = whom; + } + } + + // State + static final class KnownPeople { + private Set names = Collections.emptySet(); + + KnownPeople() { + } + + private KnownPeople(Set names) { + this.names = names; + } + + KnownPeople add(String name) { + Set newNames = new HashSet<>(names); + newNames.add(name); + return new KnownPeople(newNames); + } + + int numberOfPeople() { + return names.size(); + } + } + + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(Command.class, "HelloWorld"); + + public HelloWorld(ActorContext ctx, String entityId) { + super(ENTITY_TYPE_KEY, entityId); + } + + @Override + public KnownPeople emptyState() { + return new KnownPeople(); + } + + @Override + public CommandHandler commandHandler() { + return commandHandlerBuilder(KnownPeople.class) + .matchCommand(Greet.class, this::greet) + .matchCommand(Greet.class, this::passivate) + .build(); + } + + private Effect passivate(KnownPeople state, Command cmd) { + return Effect().stop(); + } + + private Effect greet(KnownPeople state, Greet cmd) { + return Effect().persist(new Greeted(cmd.whom)) + .andThen(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople()))); + } + + @Override + public EventHandler eventHandler() { + return (state, evt) -> state.add(evt.whom); + } + + } + //#persistent-entity +} diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java new file mode 100644 index 00000000000..6eb48c232cd --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.Join; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import static jdocs.akka.cluster.sharding.typed.HelloWorldPersistentEntityExample.*; +import static org.junit.Assert.assertEquals; + +public class HelloWorldPersistentEntityExampleTest extends JUnitSuite { + + public static final Config config = ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.netty.tcp.port = 0 \n" + + "akka.remote.artery.canonical.port = 0 \n" + + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"); + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + private ClusterSharding _sharding = null; + + private ClusterSharding sharding() { + if (_sharding == null) { + // initialize first time only + Cluster cluster = Cluster.get(testKit.system()); + cluster.manager().tell(new Join(cluster.selfMember().address())); + + ClusterSharding sharding = ClusterSharding.get(testKit.system()); + sharding.start( + Entity.ofPersistentEntity( + HelloWorld.ENTITY_TYPE_KEY, + ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()), + HelloWorld.Passivate.INSTANCE)); + _sharding = sharding; + } + return _sharding; + } + + @Test + public void sayHello() { + EntityRef world = sharding().entityRefFor(HelloWorld.ENTITY_TYPE_KEY, "1"); + TestProbe probe = testKit.createTestProbe(HelloWorld.Greeting.class); + world.tell(new HelloWorld.Greet("Alice", probe.getRef())); + HelloWorld.Greeting greeting1 = probe.expectMessageClass(HelloWorld.Greeting.class); + assertEquals("Alice", greeting1.whom); + assertEquals(1, greeting1.numberOfPeople); + + world.tell(new HelloWorld.Greet("Bob", probe.getRef())); + HelloWorld.Greeting greeting2 = probe.expectMessageClass(HelloWorld.Greeting.class); + assertEquals("Bob", greeting2.whom); + assertEquals(2, greeting2.numberOfPeople); + } + +} diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 8f01a3b8826..320e6505063 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -16,7 +16,7 @@ import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EntityRef; -import akka.cluster.sharding.typed.javadsl.ShardedEntity; +import akka.cluster.sharding.typed.javadsl.Entity; //#import @@ -103,9 +103,9 @@ public static void startPassivateExample() { EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); sharding.start( - ShardedEntity.create( - (shard, entityId) -> counter2(shard, entityId), + Entity.of( typeKey, + ctx -> counter2(ctx.getShard(), ctx.getEntityId()), new GoodByeCounter())); //#counter-passivate-start } @@ -124,9 +124,9 @@ public static void example() { EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); ActorRef> shardRegion = sharding.start( - ShardedEntity.create( - entityId -> counter(entityId,0), + Entity.of( typeKey, + ctx -> counter(ctx.getEntityId(),0), new GoodByeCounter())); //#start @@ -148,9 +148,9 @@ public static void persistenceExample() { EntityTypeKey blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost"); sharding.start( - ShardedEntity.create( - BlogBehavior::behavior, + Entity.of( blogTypeKey, + ctx -> BlogBehavior.behavior(ctx.getEntityId()), new PassivatePost())); //#persistence } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index b861ed19eea..3a08a9b6085 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -8,40 +8,27 @@ import scala.concurrent.Future import akka.Done import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.Behavior -import akka.actor.typed.Props -import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join -import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } -import akka.actor.testkit.typed.scaladsl.TestProbe import akka.persistence.typed.ExpectingReply -import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect import com.typesafe.config.ConfigFactory -import org.scalatest.{ WordSpec, WordSpecLike } +import org.scalatest.WordSpecLike object ClusterShardingPersistenceSpec { val config = ConfigFactory.parseString( """ akka.actor.provider = cluster - akka.remote.artery.enabled = true akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 - akka.cluster.jmx.multi-mbeans-in-same-jvm = on - - akka.coordinated-shutdown.terminate-actor-system = off - - akka.actor { - serialize-messages = off - allow-java-serialization = off - } - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - """.stripMargin) + """) sealed trait Command final case class Add(s: String) extends Command @@ -51,9 +38,10 @@ object ClusterShardingPersistenceSpec { val typeKey = EntityTypeKey[Command]("test") - def persistentActor(entityId: String): Behavior[Command] = - PersistentBehavior[Command, String, String]( - persistenceId = typeKey.persistenceIdFrom(entityId), + def persistentEntity(entityId: String): Behavior[Command] = + PersistentEntity[Command, String, String]( + entityTypeKey = typeKey, + entityId = entityId, emptyState = "", commandHandler = (state, cmd) ⇒ cmd match { case Add(s) ⇒ @@ -77,9 +65,9 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh "Typed cluster sharding with persistent actor" must { - ClusterSharding(system).start(ShardedEntity( - entityId ⇒ persistentActor(entityId), + ClusterSharding(system).start(Entity( typeKey, + ctx ⇒ persistentEntity(ctx.entityId), StopPlz )) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index bf409f2b981..d5ac8067774 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -183,19 +183,19 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Behaviors.same } - private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( - (shard, _) ⇒ behavior(shard), + private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity( typeKey, + ctx ⇒ behavior(ctx.shard), StopPlz())) - private val shardingRef2 = sharding2.start(ShardedEntity( - (shard, _) ⇒ behavior(shard), + private val shardingRef2 = sharding2.start(Entity( typeKey, + ctx ⇒ behavior(ctx.shard), StopPlz())) - private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(ShardedEntity( - (shard, _) ⇒ behaviorWithId(shard), + private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(Entity( typeKey2, + ctx ⇒ behaviorWithId(ctx.shard), IdStopPlz()) .withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) ⇒ id @@ -204,9 +204,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. }) ) - private val shardingRef4 = sharding2.start(ShardedEntity( - (shard, _) ⇒ behaviorWithId(shard), + private val shardingRef4 = sharding2.start(Entity( typeKey2, + ctx ⇒ behaviorWithId(ctx.shard), IdStopPlz()) .withMessageExtractor( ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { @@ -263,9 +263,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. val p = TestProbe[String]() val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test") - val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( - (shard, _) ⇒ behavior(shard, Some(stopProbe.ref)), + val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity( typeKey3, + ctx ⇒ behavior(ctx.shard, Some(stopProbe.ref)), StopPlz())) shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) @@ -281,9 +281,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. "fail if starting sharding for already used typeName, but with a different type" in { // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") val ex = intercept[Exception] { - sharding.start(ShardedEntity( - (shard, _) ⇒ behaviorWithId(shard), + sharding.start(Entity( EntityTypeKey[IdTestProtocol]("envelope-shard"), + ctx ⇒ behaviorWithId(ctx.shard), IdStopPlz())) } @@ -347,9 +347,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. "EntityRef - AskTimeoutException" in { val ignorantKey = EntityTypeKey[TestProtocol]("ignorant") - sharding.start(ShardedEntity( - _ ⇒ Behaviors.ignore[TestProtocol], + sharding.start(Entity( ignorantKey, + _ ⇒ Behaviors.ignore[TestProtocol], StopPlz())) val ref = sharding.entityRefFor(ignorantKey, "sloppy") diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala new file mode 100644 index 00000000000..3bc5ed66f0d --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.cluster.sharding.typed + +import scala.concurrent.Future +import scala.concurrent.duration._ + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem + +object HelloWorldPersistentEntityExample { + + //#persistent-entity-usage + import akka.cluster.sharding.typed.scaladsl.ClusterSharding + import akka.cluster.sharding.typed.scaladsl.Entity + import akka.util.Timeout + + class HelloWorldService(system: ActorSystem[_]) { + import system.executionContext + + // registration at startup + private val sharding = ClusterSharding(system) + + sharding.start(Entity( + typeKey = HelloWorld.entityTypeKey, + createBehavior = entityContext ⇒ HelloWorld.persistentEntity(entityContext.entityId), + stopMessage = HelloWorld.Passivate)) + + private implicit val askTimeout: Timeout = Timeout(5.seconds) + + def greet(worldId: String, whom: String): Future[Int] = { + val entityRef = sharding.entityRefFor(HelloWorld.entityTypeKey, worldId) + val greeting = entityRef ? HelloWorld.Greet(whom) + greeting.map(_.numberOfPeople) + } + + } + //#persistent-entity-usage + + //#persistent-entity + import akka.actor.typed.Behavior + import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + import akka.cluster.sharding.typed.scaladsl.PersistentEntity + import akka.persistence.typed.scaladsl.Effect + + object HelloWorld { + + // Command + trait Command + final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command + case object Passivate extends Command + // Response + final case class Greeting(whom: String, numberOfPeople: Int) + + // Event + final case class Greeted(whom: String) + + // State + private final case class KnownPeople(names: Set[String]) { + def add(name: String): KnownPeople = copy(names = names + name) + + def numberOfPeople: Int = names.size + } + + private val commandHandler: (KnownPeople, Command) ⇒ Effect[Greeted, KnownPeople] = { + (_, cmd) ⇒ + cmd match { + case cmd: Greet ⇒ greet(cmd) + case Passivate ⇒ passivate() + } + } + + private def greet(cmd: Greet): Effect[Greeted, KnownPeople] = + Effect.persist(Greeted(cmd.whom)) + .thenRun(state ⇒ cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople)) + + private def passivate(): Effect[Greeted, KnownPeople] = + Effect.stop + + private val eventHandler: (KnownPeople, Greeted) ⇒ KnownPeople = { + (state, evt) ⇒ state.add(evt.whom) + } + + val entityTypeKey: EntityTypeKey[Command] = + EntityTypeKey[Command]("HelloWorld") + + def persistentEntity(entityId: String): Behavior[Command] = PersistentEntity( + entityTypeKey = entityTypeKey, + entityId = entityId, + emptyState = KnownPeople(Set.empty), + commandHandler, + eventHandler + ) + + } + //#persistent-entity + +} diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala new file mode 100644 index 00000000000..7c23b00c0b8 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package docs.akka.cluster.sharding.typed + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object HelloWorldPersistentEntityExampleSpec { + val config = ConfigFactory.parseString( + """ + akka.actor.provider = cluster + + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) +} + +class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldPersistentEntityExampleSpec.config) with WordSpecLike { + import HelloWorldPersistentEntityExample.HelloWorld + import HelloWorldPersistentEntityExample.HelloWorld._ + + val sharding = ClusterSharding(system) + + override def beforeAll(): Unit = { + super.beforeAll() + Cluster(system).manager ! Join(Cluster(system).selfMember.address) + + sharding.start(Entity( + HelloWorld.entityTypeKey, + ctx ⇒ HelloWorld.persistentEntity(ctx.entityId), + HelloWorld.Passivate + )) + } + + "HelloWorld example" must { + + "sayHello" in { + val probe = createTestProbe[Greeting]() + val ref = ClusterSharding(system).entityRefFor(HelloWorld.entityTypeKey, "1") + ref ! Greet("Alice")(probe.ref) + probe.expectMessage(Greeting("Alice", 1)) + ref ! Greet("Bob")(probe.ref) + probe.expectMessage(Greeting("Bob", 2)) + } + + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index a01b60ab3bd..f9b826c04bd 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -6,9 +6,9 @@ package docs.akka.cluster.sharding.typed import scala.concurrent.duration._ -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.actor.typed.scaladsl.Behaviors -import akka.cluster.sharding.typed.scaladsl.ShardedEntity +import akka.cluster.sharding.typed.scaladsl.Entity import docs.akka.persistence.typed.BlogPostExample import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost } @@ -17,7 +17,6 @@ object ShardingCompileOnlySpec { val system = ActorSystem(Behaviors.empty, "Sharding") //#sharding-extension - import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey @@ -50,9 +49,9 @@ object ShardingCompileOnlySpec { //#start val TypeKey = EntityTypeKey[CounterCommand]("Counter") - val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(ShardedEntity( - create = entityId ⇒ counter(entityId, 0), + val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(Entity( typeKey = TypeKey, + createBehavior = ctx ⇒ counter(ctx.entityId, 0), stopMessage = GoodByeCounter)) //#start @@ -69,9 +68,9 @@ object ShardingCompileOnlySpec { //#persistence val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost") - ClusterSharding(system).start(ShardedEntity( - create = entityId ⇒ behavior(entityId), + ClusterSharding(system).start(Entity( typeKey = BlogTypeKey, + createBehavior = ctx ⇒ behavior(ctx.entityId), stopMessage = PassivatePost)) //#persistence @@ -103,9 +102,9 @@ object ShardingCompileOnlySpec { } } - sharding.start(ShardedEntity( - create = (shard, entityId) ⇒ counter2(shard, entityId), + sharding.start(Entity( typeKey = TypeKey, + createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId), stopMessage = GoodByeCounter)) //#counter-passivate diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3d0ed4b8e15..c7b346540ba 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -64,24 +64,36 @@ Java When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of an actor after it has moved. -Taking the larger example from the @ref:[persistence documentation](persistence.md#larger-example) and making it into -a sharded entity is the same as for a non persistent behavior. The behavior: +Akka Persistence is based on the single-writer principle, for a particular `persitenceId` only one persistent actor +instance should be active. If multiple instances were to persist events at the same time, the events would be +interleaved and might not be interpreted correctly on replay. Cluster sharding is typically used together with +persistence to ensure that there is only one active entity for each `persistenceId` (`entityId`). + +Here is an example of a persistent actor that is used as a sharded entity: Scala -: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior } +: @@snip [HelloWorldPersistentEntityExample.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala) { #persistent-entity } Java -: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #behavior } +: @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-import #persistent-entity } + +Note that `PersistentEntity` is used in this example. Any `Behavior` can be used as a sharded entity actor, +but the combination of sharding and persistent actors is very common and therefore the `PersistentEntity` +@scala[factory]@java[class] is provided as convenience. It selects the `persistenceId` automatically from +the `EntityTypeKey` and `entityId` @java[constructor] parameters by using `EntityTypeKey.persistenceIdFrom`. -To create the entity: +To initialize and use the entity: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } +: @@snip [HelloWorldPersistentEntityExample.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala) { #persistent-entity-usage } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #persistence } +: @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-usage-import #persistent-entity-usage } + +Sending messages to persistent entities is the same as if the entity wasn't persistent. The only difference is +when an entity is moved the state will be restored. In the above example @ref:[ask](interaction-patterns.md#outside-ask) +is used but `tell` or any of the other @ref:[Interaction Patterns](interaction-patterns.md) can be used. -Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. See @ref:[persistence](persistence.md) for more details. ## Passivation @@ -92,7 +104,7 @@ the entity actors for example by defining receive timeout (`context.setReceiveTi If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without losing such messages the entity actor can send `ClusterSharding.Passivate` to to the -@scala:[`ActorRef[ShardCommand]`]@java:[`ActorRef`] that was passed in to +@scala[`ActorRef[ShardCommand]`]@java[`ActorRef`] that was passed in to the factory method when creating the entity. The specified `handOffStopMessage` message will be sent back to the entity, which is then supposed to stop itself. Incoming messages will be buffered by the `Shard` between reception of `Passivate` and termination of the diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index 35179a183d9..b9c4a3d1362 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -169,8 +169,8 @@ The response adapting function is running in the receiving actor and can safely * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact * Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requester, but at the same time you do not want to have many false positives - -## Request-Response with ask from outside the ActorSystem + +## Request-Response with ask from outside an Actor Some times you need to interact with actors from outside of the actor system, this can be done with fire-and-forget as described above or through another version of `ask` that returns a @scala[`Future[Response]`]@java[`CompletionStage`] that is either completed with a successful response or failed with a `TimeoutException` if there was no response within the specified timeout. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 423b9e7281e..419290e9699 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -130,8 +130,19 @@ Scala Java : @@snip [PersistentActorCompileOnyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #behavior } -The `PersistentBehavior` can then be run as with any plain typed actor as described in [typed actors documentation](actors-typed.md). - +## Cluster Sharding and persistence + +In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or +where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can +resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a +cluster and address them by id. + +The `PersistentBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md), +but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together +with Cluster Sharding. For a particular `persistenceId` only one persistent actor instance should be active at one time. +If multiple instances were to persist events at the same time, the events would be interleaved and might not be +interpreted correctly on replay. Cluster Sharding ensures that there is only one active entity for each id. The +@ref:[Cluster Sharding example](cluster-sharding.md#persistence-example) illustrates this common combination. ## Accessing the ActorContext diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index 459a373df5f..82706b6c354 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -14,6 +14,11 @@ import akka.persistence.typed.ExpectingReply object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] +/** + * Factory methods for creating [[Effect]] directives. + * + * Not for user extension + */ @DoNotInherit sealed class EffectFactories[Command, Event, State] { /** * Persist a single event diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala index 021e8d76a5c..fec687a9a3b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala @@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl import akka.Done import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior.DeferredBehavior -import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.internal._ @@ -85,9 +84,14 @@ object PersistentBehavior { } /** - * Not intended for user extension. + * Further customization of the `PersistentBehavior` can be done with the methods defined here. + * + * Not for user extension */ @DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] { + + def persistenceId: PersistenceId + /** * The `callback` function is called to notify the actor that the recovery process * is finished.