Skip to content

Commit

Permalink
add unit test protect ExtractEntityId can be shared safely
Browse files Browse the repository at this point in the history
Related with apache#1463
  • Loading branch information
Roiocam committed Sep 11, 2024
1 parent d1ec224 commit 335dc31
Showing 1 changed file with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import java.io.File
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import org.apache.pekko
import org.apache.pekko.Done
import org.apache.pekko.cluster.sharding.ShardRegion.MessageExtractor
import org.apache.pekko.stream.scaladsl.{ Sink, Source }
import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props }
import pekko.cluster.{ Cluster, MemberStatus }
import pekko.cluster.ClusterEvent.CurrentClusterState
import pekko.testkit.{ DeadLettersFilter, PekkoSpec, TestProbe, WithLogCapturing }
import pekko.testkit.TestEvent.Mute

import scala.concurrent.{ ExecutionContext, Future }

object ShardRegionSpec {
val host = "127.0.0.1"
val tempConfig =
Expand Down Expand Up @@ -54,6 +59,7 @@ object ShardRegionSpec {
val shardTypeName = "Caat"

val numberOfShards = 3
val largerShardNum = 20

val extractEntityId: ShardRegion.ExtractEntityId = {
case msg: Int => (msg.toString, msg)
Expand All @@ -66,11 +72,37 @@ object ShardRegionSpec {
case _ => throw new IllegalArgumentException()
}

val messageExtractor: MessageExtractor = new MessageExtractor {
override def entityId(message: Any): String = message match {
case msg: Int => msg.toString
case _ => throw new IllegalArgumentException()
}

override def shardId(message: Any): String = message match {
case msg: Int => (msg % largerShardNum).toString
case _ => throw new IllegalArgumentException()
}

override def entityMessage(message: Any): Any = message
}

class EntityActor extends Actor with ActorLogging {
override def receive: Receive = {
case msg => sender() ! msg
}
}

class IDMatcherActor extends Actor with ActorLogging {
override def receive: Receive = {
case msg =>
val selfEntityId = self.path.name
val msgEntityId = messageExtractor.entityId(msg)
if (selfEntityId != msgEntityId) {
throw new IllegalStateException(s"EntityId mismatch: $selfEntityId != $msgEntityId")
}
sender() ! msg
}
}
}
class ShardRegionSpec extends PekkoSpec(ShardRegionSpec.config) with WithLogCapturing {

Expand Down Expand Up @@ -183,4 +215,43 @@ class ShardRegionSpec extends PekkoSpec(ShardRegionSpec.config) with WithLogCapt
}
}

"ExtractEntityId" must {
"can be safely share to multiple shards" in {
implicit val ec: ExecutionContext = system.dispatcher

Cluster(sysA).join(Cluster(sysA).selfAddress) // coordinator on A
awaitAssert(Cluster(sysA).selfMember.status shouldEqual MemberStatus.Up, 1.second)

within(10.seconds) {
awaitAssert {
Set(sysA).foreach { s =>
Cluster(s).sendCurrentClusterState(testActor)
expectMsgType[CurrentClusterState].members.size shouldEqual 2
}
}
}

val shardTypeName = "Doog"
val region = ClusterSharding(sysA).start(
shardTypeName,
Props[IDMatcherActor](),
ClusterShardingSettings(system),
messageExtractor)

val total = largerShardNum * 100
val source = Source(1 to total)

val flow = source.mapAsync(parallelism = largerShardNum) { i =>
Future {
region.tell(i, p1.ref)
}
}

val result = flow.runWith(Sink.ignore)

result.futureValue shouldEqual Done
p1.receiveN(total, 10.seconds)
}
}

}

0 comments on commit 335dc31

Please sign in to comment.