Skip to content

Commit

Permalink
Create typed ActorMaterializer from ActorContext, #akka#25536
Browse files Browse the repository at this point in the history
* Create typed ActorMaterializer from ActorContext to bind stream's lifecycle with an actor's lifecycle

* Add Java API for creating typed ActorMaterializer from ActorContext
  • Loading branch information
Dollyg authored and patriknw committed Oct 18, 2018
1 parent abb3429 commit e26a90f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.actor.typed.javadsl

import akka.actor
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.EmptyProps
Expand Down Expand Up @@ -59,6 +60,9 @@ object Adapter {
def toUntyped(sys: ActorSystem[_]): akka.actor.ActorSystem =
sys.toUntyped

def toUntyped(ctx: ActorContext[_]): actor.ActorContext =
ActorContextAdapter.toUntyped(ctx)

def watch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit =
ctx.watch(other)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ package object adapter {
def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef =
ActorContextAdapter.toUntyped(ctx).actorOf(props, name)

def toUntyped: akka.actor.ActorContext = ActorContextAdapter.toUntyped(ctx)

// watch, unwatch and stop not needed here because of the implicit ActorRef conversion
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.stream.typed.javadsl

import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.{ ActorContext, Adapter }
import akka.stream.ActorMaterializerSettings

object ActorMaterializerFactory {
Expand Down Expand Up @@ -43,4 +44,34 @@ object ActorMaterializerFactory {
def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
akka.stream.ActorMaterializer.create(settings, actorSystem.toUntyped, namePrefix)

/**
* Creates an `ActorMaterializer` which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
*
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create[T](ctx: ActorContext[T]): akka.stream.ActorMaterializer =
akka.stream.ActorMaterializer.create(Adapter.toUntyped(ctx))

/**
* Creates an `ActorMaterializer` which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
*/
def create[T](settings: ActorMaterializerSettings, ctx: ActorContext[T]): akka.stream.ActorMaterializer =
akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx))

/**
* Creates an `ActorMaterializer` which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create[T](settings: ActorMaterializerSettings, namePrefix: String, ctx: ActorContext[T]): akka.stream.ActorMaterializer =
akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx), namePrefix)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.stream.typed.scaladsl

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.ActorContext
import akka.stream.ActorMaterializerSettings

object ActorMaterializer {
Expand All @@ -25,4 +26,19 @@ object ActorMaterializer {
def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): ActorMaterializer =
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped)

/**
* Creates an `ActorMaterializer` which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
* will be bound to the lifecycle of the provided [[akka.actor.typed.scaladsl.ActorContext]]
*
* The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the
* configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]].
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def boundToActor[T](ctx: ActorContext[T], materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None): ActorMaterializer =
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(ctx.toUntyped)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.typed.scaladsl

//#imports
import akka.stream.typed.scaladsl.ActorMaterializer
import akka.stream.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

package akka.stream.typed.scaladsl

import scala.concurrent.Future
import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.stream.AbruptStageTerminationException
import akka.stream.scaladsl.{ Sink, Source }
import org.scalatest.WordSpecLike

import scala.concurrent.Future

object CustomGuardianAndMaterializerSpec {

sealed trait GuardianProtocol
Expand All @@ -38,6 +40,20 @@ class CustomGuardianAndMaterializerSpec extends ScalaTestWithActorTestKit with W
it.futureValue should ===("hello")
}

}
"should kill streams with bound actor context" in {
var doneF: Future[Done] = null
val behavior =
Behaviors.setup[String] { ctx
implicit val mat: ActorMaterializer = ActorMaterializer.boundToActor(ctx)
doneF = Source.repeat("hello").runWith(Sink.ignore)

Behaviors.receiveMessage[String](_ Behaviors.stopped)
}

val actorRef = spawn(behavior)

actorRef ! "kill"
eventually(doneF.failed.futureValue shouldBe an[AbruptStageTerminationException])
}
}
}

0 comments on commit e26a90f

Please sign in to comment.