-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
Showing
8 changed files
with
625 additions
and
70 deletions.
There are no files selected for viewing
186 changes: 186 additions & 0 deletions
186
akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
/* | ||
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.actor.typed.eventstream; | ||
|
||
// #imports | ||
import akka.actor.AllDeadLetters; | ||
import akka.actor.SuppressedDeadLetter; | ||
import akka.actor.testkit.typed.javadsl.ActorTestKit; | ||
import akka.actor.testkit.typed.javadsl.TestProbe; | ||
import akka.actor.typed.Behavior; | ||
import akka.actor.typed.Props; | ||
import akka.actor.typed.SpawnProtocol; | ||
import akka.actor.typed.SpawnProtocol.Spawn; | ||
import akka.actor.typed.eventstream.EventStream.Publish; | ||
import akka.actor.typed.eventstream.EventStream.Subscribe; | ||
import akka.actor.typed.javadsl.AbstractBehavior; | ||
import akka.actor.typed.javadsl.ActorContext; | ||
import akka.actor.typed.javadsl.AskPattern; | ||
import akka.actor.typed.javadsl.Behaviors; | ||
import akka.actor.typed.javadsl.Receive; | ||
import akka.testkit.javadsl.TestKit; | ||
import java.time.Duration; | ||
import java.util.concurrent.CompletionStage; | ||
import org.junit.Test; | ||
import org.scalatestplus.junit.JUnitSuite; | ||
// #imports-deadletter | ||
import akka.actor.DeadLetter; | ||
import akka.actor.typed.ActorRef; | ||
import akka.actor.typed.ActorSystem; | ||
// #imports-deadletter | ||
|
||
public class LoggingDocTest extends JUnitSuite { | ||
|
||
@Test | ||
public void subscribeToDeadLetters() { | ||
// #deadletters | ||
ActorSystem<DeadLetter> system = ActorSystem.create(Behaviors.empty(), "DeadLetters"); | ||
system.eventStream().tell(new Subscribe<>(DeadLetter.class, system)); | ||
// #deadletters | ||
ActorTestKit.shutdown(system); | ||
} | ||
|
||
public | ||
// #deadletter-actor | ||
static class DeadLetterActor extends AbstractBehavior<String> { | ||
|
||
public static Behavior<String> create() { | ||
return Behaviors.setup(DeadLetterActor::new); | ||
} | ||
|
||
public DeadLetterActor(ActorContext<String> context) { | ||
super(context); | ||
ActorRef<DeadLetter> messageAdapter = context.messageAdapter( | ||
DeadLetter.class, | ||
d -> d.message().toString() | ||
); | ||
context.getSystem().eventStream() | ||
.tell(new Subscribe<>(DeadLetter.class, messageAdapter)); | ||
} | ||
|
||
@Override | ||
public Receive<String> createReceive() { | ||
return newReceiveBuilder().onMessage(String.class, msg -> { | ||
System.out.println(msg); | ||
return Behaviors.same(); | ||
}).build(); | ||
} | ||
} | ||
// #deadletter-actor | ||
|
||
// #superclass-subscription-eventstream | ||
interface AllKindsOfMusic { | ||
|
||
} | ||
|
||
class Jazz implements AllKindsOfMusic { | ||
|
||
public final String artist; | ||
|
||
public Jazz(String artist) { | ||
this.artist = artist; | ||
} | ||
} | ||
|
||
class Electronic implements AllKindsOfMusic { | ||
|
||
public final String artist; | ||
|
||
public Electronic(String artist) { | ||
this.artist = artist; | ||
} | ||
} | ||
|
||
static class Listener extends AbstractBehavior<AllKindsOfMusic> { | ||
|
||
public static Behavior<AllKindsOfMusic> create() { | ||
return Behaviors.setup(Listener::new); | ||
} | ||
|
||
public Listener(ActorContext<AllKindsOfMusic> context) { | ||
super(context); | ||
} | ||
|
||
|
||
@Override | ||
public Receive<AllKindsOfMusic> createReceive() { | ||
return newReceiveBuilder() | ||
.onMessage(Jazz.class, msg -> { | ||
System.out.printf("%s is listening to: %s%n", | ||
getContext().getSelf().path().name(), | ||
msg); | ||
return Behaviors.same(); | ||
}) | ||
.onMessage(Electronic.class, msg -> { | ||
System.out.printf("%s is listening to: %s%n", | ||
getContext().getSelf().path().name(), | ||
msg); | ||
return Behaviors.same(); | ||
}).build(); | ||
} | ||
} | ||
// #superclass-subscription-eventstream | ||
|
||
@Test | ||
public void subscribeBySubclassification() { | ||
// #superclass-subscription-eventstream | ||
ActorSystem<SpawnProtocol.Command> system = ActorSystem.create(SpawnProtocol.create(), | ||
"Subclassification"); | ||
final Duration timeout = Duration.ofSeconds(3); | ||
|
||
CompletionStage<ActorRef<Jazz>> jazzListener = AskPattern.ask( | ||
system, | ||
replyTo -> new Spawn(Listener.create(), "jazzListener", Props.empty(), replyTo), | ||
timeout, | ||
system.scheduler() | ||
); | ||
|
||
CompletionStage<ActorRef<AllKindsOfMusic>> musicListener = AskPattern.ask( | ||
system, | ||
replyTo -> new Spawn(Listener.create(), "musicListener", Props.empty(), replyTo), | ||
timeout, | ||
system.scheduler() | ||
); | ||
|
||
ActorRef<Jazz> jazzListenerActorRef = jazzListener.toCompletableFuture().join(); | ||
ActorRef<AllKindsOfMusic> musicListenerActorRef = musicListener.toCompletableFuture() | ||
.join(); | ||
|
||
system.eventStream().tell(new Subscribe<>(Jazz.class, jazzListenerActorRef)); | ||
system.eventStream().tell(new Subscribe<>(AllKindsOfMusic.class, musicListenerActorRef)); | ||
// only musicListener gets this message, since it listens to *all* kinds of music: | ||
system.eventStream().tell(new Publish<>(new Electronic("Parov Stelar"))); | ||
|
||
// jazzListener and musicListener will be notified about Jazz: | ||
system.eventStream().tell(new Publish<>(new Jazz("Sonny Rollins"))); | ||
|
||
// #superclass-subscription-eventstream | ||
ActorTestKit.shutdown(system); | ||
} | ||
|
||
@Test | ||
public void subscribeToSuppressedDeadLetters() { | ||
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SuppressedDeadLetter"); | ||
TestProbe<SuppressedDeadLetter> probe = TestProbe.create(system); | ||
ActorRef<SuppressedDeadLetter> actor = probe.ref(); | ||
// #suppressed-deadletters | ||
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor)); | ||
// #suppressed-deadletters | ||
|
||
ActorTestKit.shutdown(system); | ||
} | ||
|
||
@Test | ||
public void subscribeToAllDeadLetters() { | ||
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "AllDeadLetters"); | ||
TestProbe<AllDeadLetters> probe = TestProbe.create(system); | ||
ActorRef<AllDeadLetters> actor = probe.ref(); | ||
// #all-deadletters | ||
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor)); | ||
// #all-deadletters | ||
|
||
ActorTestKit.shutdown(system); | ||
} | ||
} |
122 changes: 122 additions & 0 deletions
122
akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.actor.typed.eventstream | ||
|
||
import akka.actor.DeadLetter | ||
import akka.actor.testkit.typed.scaladsl.LogCapturing | ||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit | ||
import akka.actor.testkit.typed.scaladsl.TestProbe | ||
import akka.actor.typed.ActorRef | ||
import akka.actor.typed.ActorSystem | ||
import akka.actor.typed.Props | ||
import akka.actor.typed.SpawnProtocol | ||
import akka.actor.typed.SpawnProtocol.Spawn | ||
import akka.actor.typed.eventstream.EventStream.Publish | ||
import akka.actor.typed.eventstream.EventStream.Subscribe | ||
import akka.actor.typed.scaladsl.Behaviors | ||
import org.scalatest.wordspec.AnyWordSpecLike | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
|
||
object LoggingDocSpec { | ||
|
||
//#deadletters | ||
import akka.actor.typed.Behavior | ||
import akka.actor.typed.eventstream.EventStream.Subscribe | ||
import akka.actor.typed.scaladsl.Behaviors | ||
|
||
object DeadLetterListener { | ||
|
||
def apply(): Behavior[String] = Behaviors.setup { context => | ||
// subscribe DeadLetter at startup. | ||
val adapter = context.messageAdapter[DeadLetter](d => d.message.toString) | ||
context.system.eventStream ! Subscribe(adapter) | ||
|
||
Behaviors.receiveMessage { | ||
case msg: String => | ||
println(msg) | ||
Behaviors.same | ||
} | ||
} | ||
} | ||
//#deadletters | ||
|
||
//#superclass-subscription-eventstream | ||
object ListenerActor { | ||
abstract class AllKindsOfMusic { def artist: String } | ||
case class Jazz(artist: String) extends AllKindsOfMusic | ||
case class Electronic(artist: String) extends AllKindsOfMusic | ||
|
||
def apply(): Behavior[ListenerActor.AllKindsOfMusic] = Behaviors.receive { (context, msg) => | ||
msg match { | ||
case m: Jazz => | ||
println(s"${context.self.path.name} is listening to: ${m.artist}") | ||
Behaviors.same | ||
case m: Electronic => | ||
println(s"${context.self.path.name} is listening to: ${m.artist}") | ||
Behaviors.same | ||
case _ => | ||
Behaviors.same | ||
} | ||
} | ||
} | ||
//#superclass-subscription-eventstream | ||
|
||
} | ||
|
||
class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { | ||
import LoggingDocSpec._ | ||
import akka.actor.typed.scaladsl.AskPattern._ | ||
|
||
"allow registration to dead letters" in { | ||
// #deadletters | ||
ActorSystem(Behaviors.setup[Void] { context => | ||
context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty) | ||
Behaviors.empty | ||
}, "System") | ||
// #deadletters | ||
} | ||
|
||
"demonstrate superclass subscriptions on typed eventStream" in { | ||
import LoggingDocSpec.ListenerActor._ | ||
//#superclass-subscription-eventstream | ||
|
||
implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(SpawnProtocol(), "SpawnProtocol") | ||
implicit val ec: ExecutionContext = system.executionContext | ||
|
||
val jazzListener: Future[ActorRef[Jazz]] = | ||
system.ask(Spawn(behavior = ListenerActor(), name = "jazz", props = Props.empty, _)) | ||
val musicListener: Future[ActorRef[AllKindsOfMusic]] = | ||
system.ask(Spawn(behavior = ListenerActor(), name = "music", props = Props.empty, _)) | ||
|
||
for (jazzListenerRef <- jazzListener; musicListenerRef <- musicListener) { | ||
system.eventStream ! Subscribe(jazzListenerRef) | ||
system.eventStream ! Subscribe(musicListenerRef) | ||
} | ||
|
||
// only musicListener gets this message, since it listens to *all* kinds of music: | ||
system.eventStream ! Publish(Electronic("Parov Stelar")) | ||
|
||
// jazzListener and musicListener will be notified about Jazz: | ||
system.eventStream ! Publish(Jazz("Sonny Rollins")) | ||
//#superclass-subscription-eventstream | ||
} | ||
|
||
"allow registration to suppressed dead letters" in { | ||
val listener: ActorRef[Any] = TestProbe().ref | ||
|
||
//#suppressed-deadletters | ||
import akka.actor.SuppressedDeadLetter | ||
system.eventStream ! Subscribe[SuppressedDeadLetter](listener) | ||
//#suppressed-deadletters | ||
|
||
//#all-deadletters | ||
import akka.actor.AllDeadLetters | ||
system.eventStream ! Subscribe[AllDeadLetters](listener) | ||
//#all-deadletters | ||
} | ||
|
||
} |
Oops, something went wrong.