Skip to content

Commit

Permalink
PersistentEntity to glue together Sharding and PersistentBehavior better
Browse files Browse the repository at this point in the history
* Makes the combination more visable
* You don't have to worry about the persistenceId, only EntityTypeKey and entityId
* The glue is stronger in the javadsl because of two reasons
  * Couldn't realisticly create a PersistentEntity class extending PersistenBehavior (which
    contains all the optional parameters and functions) since that would duplicate too much.
  * The ActorContext would be needed in the ShardedEntityContext parameter and because of the
    additional M type parameters the type inference breaks down when using the factory. Would
    require specifying the type of the ShardedEntityContex[M] parameter. That problem doesn't
    seem to exist in Java.

renamed:
s/ShardedEntityContext/EntityContext/
s/ShardedEntity/Entity/
  • Loading branch information
patriknw committed Oct 18, 2018
1 parent bed17cc commit 2672bd7
Show file tree
Hide file tree
Showing 19 changed files with 670 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.sharding.typed.scaladsl.EntityContext
import akka.cluster.typed.Cluster
import akka.event.Logging
import akka.event.LoggingAdapter
Expand Down Expand Up @@ -130,37 +131,40 @@ import akka.util.Timeout
private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap

// scaladsl impl
override def start[M, E](shardedEntity: scaladsl.ShardedEntity[M, E]): ActorRef[E] = {
val settings = shardedEntity.settings match {
override def start[M, E](entity: scaladsl.Entity[M, E]): ActorRef[E] = {
val settings = entity.settings match {
case None ClusterShardingSettings(system)
case Some(s) s
}

val extractor = (shardedEntity.messageExtractor match {
val extractor = (entity.messageExtractor match {
case None new HashCodeMessageExtractor[M](settings.numberOfShards)
case Some(e) e
}).asInstanceOf[ShardingMessageExtractor[E, M]]

internalStart(shardedEntity.create, shardedEntity.entityProps, shardedEntity.typeKey,
shardedEntity.stopMessage, settings, extractor, shardedEntity.allocationStrategy)
internalStart(entity.createBehavior, entity.entityProps, entity.typeKey,
entity.stopMessage, settings, extractor, entity.allocationStrategy)
}

// javadsl impl
override def start[M, E](shardedEntity: javadsl.ShardedEntity[M, E]): ActorRef[E] = {
override def start[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = {
import scala.compat.java8.OptionConverters._
start(new scaladsl.ShardedEntity(
create = (shard, entitityId) shardedEntity.createBehavior.apply(shard, entitityId),
typeKey = shardedEntity.typeKey.asScala,
stopMessage = shardedEntity.stopMessage,
entityProps = shardedEntity.entityProps,
settings = shardedEntity.settings.asScala,
messageExtractor = shardedEntity.messageExtractor.asScala,
allocationStrategy = shardedEntity.allocationStrategy.asScala
start(new scaladsl.Entity(
createBehavior = (ctx: EntityContext) Behaviors.setup[M] { actorContext
entity.createBehavior(
new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava))
},
typeKey = entity.typeKey.asScala,
stopMessage = entity.stopMessage,
entityProps = entity.entityProps,
settings = entity.settings.asScala,
messageExtractor = entity.messageExtractor.asScala,
allocationStrategy = entity.allocationStrategy.asScala
))
}

private def internalStart[M, E](
behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[M],
behavior: EntityContext Behavior[M],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[M],
stopMessage: M,
Expand All @@ -185,7 +189,7 @@ import akka.util.Timeout
if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name)

val shardCommandDelegator =
val shardCommandDelegator: ActorRef[scaladsl.ClusterSharding.ShardCommand] =
shardCommandActors.computeIfAbsent(
typeKey.name,
new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] {
Expand All @@ -198,7 +202,7 @@ import akka.util.Timeout
})

val untypedEntityPropsFactory: String akka.actor.Props = { entityId
PropsAdapter(behavior(shardCommandDelegator, entityId), entityProps)
PropsAdapter(behavior(new EntityContext(entityId, shardCommandDelegator)), entityProps)
}

untypedSharding.internalStart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.RecipientRef
import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
Expand Down Expand Up @@ -167,15 +168,15 @@ object ClusterSharding {
abstract class ClusterSharding {

/**
* Initialize sharding for the given `shardedEntity` factory settings.
* Initialize sharding for the given `entity` factory settings.
*
* It will start a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts
*/
def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E]
def start[M, E](entity: Entity[M, E]): ActorRef[E]

/**
* Create an `ActorRef`-like reference to a specific sharded entity.
Expand All @@ -195,49 +196,62 @@ abstract class ClusterSharding {
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}

object ShardedEntity {
object Entity {

/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore the [[Entity.ofPersistentEntity]] is provided as convenience.
*
* @param createBehavior Create the behavior for an entity given an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
*
* @tparam M The type of message the entity accepts
*/
def create[M](
createBehavior: JFunction[String, Behavior[M]],
def of[M](
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = {
create(new BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] {
override def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] =
createBehavior.apply(entityId)
}, typeKey, stopMessage)
createBehavior: JFunction[EntityContext[M], Behavior[M]],
stopMessage: M): Entity[M, ShardingEnvelope[M]] = {
new Entity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
}

/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
* Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#start]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
* More optional settings can be defined using the `with` methods of the returned [[Entity]].
*
* @param createBehavior Create the behavior for an entity given `ShardCommand` ref and an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts
* @tparam Command The type of message the entity accepts
*/
def create[M](
createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] =
new ShardedEntity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
def ofPersistentEntity[Command, Event, State >: Null](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]],
stopMessage: Command): Entity[Command, ShardingEnvelope[Command]] = {

of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
val persistentEntity = createPersistentEntity(ctx)
if (persistentEntity.entityTypeKey != typeKey)
throw new IllegalArgumentException(s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " +
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
}, stopMessage)
}

}

/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]].
*/
final class ShardedEntity[M, E] private[akka] (
val createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]],
final class Entity[M, E] private[akka] (
val createBehavior: JFunction[EntityContext[M], Behavior[M]],
val typeKey: EntityTypeKey[M],
val stopMessage: M,
val entityProps: Props,
Expand All @@ -248,13 +262,13 @@ final class ShardedEntity[M, E] private[akka] (
/**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
*/
def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] =
def withEntityProps(newEntityProps: Props): Entity[M, E] =
copy(entityProps = newEntityProps)

/**
* Additional settings, typically loaded from configuration.
*/
def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] =
def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
copy(settings = Optional.ofNullable(newSettings))

/**
Expand All @@ -265,29 +279,44 @@ final class ShardedEntity[M, E] private[akka] (
* shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
* is configured with `akka.cluster.sharding.number-of-shards`.
*/
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] =
new ShardedEntity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy)
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] =
new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy)

/**
* Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
*/
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] =
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] =
copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy))

private def copy(
create: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy
): ShardedEntity[M, E] = {
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
createBehavior: JFunction[EntityContext[M], Behavior[M]] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy
): Entity[M, E] = {
new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
}

}

/**
* Parameter to [[Entity.of]]
*/
final class EntityContext[M](
entityId: String,
shard: ActorRef[ClusterSharding.ShardCommand],
actorContext: ActorContext[M]) {

def getEntityId: String = entityId

def getShard: ActorRef[ClusterSharding.ShardCommand] = shard

def getActorContext: ActorContext[M] = actorContext
}

/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.cluster.sharding.typed.javadsl

import java.util.Optional

import scala.compat.java8.OptionConverters._

import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.PersistentBehavior

/**
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class PersistentEntity[Command, Event, State >: Null] private (
val entityTypeKey: EntityTypeKey[Command],
persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) {

def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy])
}

def this(entityTypeKey: EntityTypeKey[Command], entityId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy))
}

}
Loading

0 comments on commit 2672bd7

Please sign in to comment.