Skip to content

Commit

Permalink
Add Flow to support RabbitMQ RPC workflow #160
Browse files Browse the repository at this point in the history
Changes Amqp sinks to materialize to Future[Done]. As currently it was
very difficult to determine when/if a sink failed due to a amqp error.
  • Loading branch information
Falmarri committed Feb 6, 2017
1 parent 2f03442 commit 66c45fb
Show file tree
Hide file tree
Showing 11 changed files with 667 additions and 43 deletions.
188 changes: 188 additions & 0 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpRpcFlowStage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.amqp

import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{DefaultConsumer, Envelope, ShutdownSignalException}

import scala.collection.mutable
import scala.concurrent.{Future, Promise}

object AmqpRpcFlowStage {

private val defaultAttributes =
Attributes.name("AmqpRpcFlow").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
}

/**
* This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication
*
* @param responsesPerMessage The number of responses that should be expected for each message placed on the queue. This
* can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]]
*/
final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[OutgoingMessage, IncomingMessage], Future[String]]
with AmqpConnector { stage =>

import AmqpRpcFlowStage._

val in = Inlet[OutgoingMessage]("AmqpRpcFlow.in")
val out = Outlet[IncomingMessage]("AmqpRpcFlow.out")

override def shape: FlowShape[OutgoingMessage, IncomingMessage] = FlowShape.of(in, out)

override protected def initialAttributes: Attributes = defaultAttributes

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[String]) = {
val promise = Promise[String]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {

override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
private val queue = mutable.Queue[IncomingMessage]()
private var queueName: String = _
private var outstandingMessages = 0

override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
val shutdownCallback = getAsyncCallback[Option[ShutdownSignalException]] {
case Some(ex) =>
promise.tryFailure(ex)
failStage(ex)
case None =>
promise.trySuccess("")
completeStage()
}

pull(in)

// we have only one consumer per connection so global is ok
channel.basicQos(bufferSize, true)
val consumerCallback = getAsyncCallback(handleDelivery)

val amqpSourceConsumer = new DefaultConsumer(channel) {
override def handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]
): Unit =
consumerCallback.invoke(IncomingMessage(ByteString(body), envelope, properties))

override def handleCancel(consumerTag: String): Unit =
// non consumer initiated cancel, for example happens when the queue has been deleted.
shutdownCallback.invoke(None)

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit =
// "Called when either the channel or the underlying connection has been shut down."
shutdownCallback.invoke(Option(sig))
}

// Create an exclusive queue with a randomly generated name for use as the replyTo portion of RPC
queueName = channel
.queueDeclare(
"",
false,
true,
true,
Map.empty[String, AnyRef].asJava
)
.getQueue

channel.basicConsume(
queueName,
amqpSourceConsumer
)
promise.success(queueName)
}

def handleDelivery(message: IncomingMessage): Unit =
if (isAvailable(out)) {
pushAndAckMessage(message)
} else {
if (queue.size + 1 > bufferSize) {
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
queue.enqueue(message)
}
}

def pushAndAckMessage(message: IncomingMessage): Unit = {
push(out, message)
// ack it as soon as we have passed it downstream
// TODO ack less often and do batch acks with multiple = true would probably be more performant
channel.basicAck(
message.envelope.getDeliveryTag,
false // just this single message
)
outstandingMessages -= 1

if (outstandingMessages == 0 && isClosed(in)) {
completeStage()
}
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
}
}
)

setHandler(
in,
new InHandler {
// We don't want to finish since we're still waiting
// on incoming messages from rabbit. However, if we
// haven't processed a message yet, we do want to complete
// so that we don't hang.
override def onUpstreamFinish(): Unit =
if (outstandingMessages == 0) super.onUpstreamFinish()

override def onPush(): Unit = {
val elem = grab(in)
val props = elem.props.getOrElse(new BasicProperties()).builder.replyTo(queueName).build()
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
props,
elem.bytes.toArray
)

val expectedResponses: Int = {
val headers = props.getHeaders
if (headers == null) {
responsesPerMessage
} else {
val r = headers.get("expectedReplies")
if (r != null) {
r.asInstanceOf[Int]
} else {
responsesPerMessage
}
}
}

outstandingMessages += expectedResponses
pull(in)
}
}
)
}, promise.future)
}

override def toString: String = "AmqpRpcFlow"

}
159 changes: 132 additions & 27 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@
*/
package akka.stream.alpakka.amqp

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.Done
import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageWithMaterializedValue, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}
import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._

import scala.concurrent.{Future, Promise}

final case class OutgoingMessage(bytes: ByteString,
immediate: Boolean,
mandatory: Boolean,
props: Option[BasicProperties])

object AmqpSinkStage {

/**
* Internal API
*/
private val defaultAttributes =
Attributes.name("AmsqpSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
Attributes.name("AmqpSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the broker.
*/
final class AmqpSinkStage(settings: AmqpSinkSettings)
extends GraphStage[SinkShape[OutgoingMessage]]
extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]]
with AmqpConnector { stage =>
import AmqpSinkStage._

Expand All @@ -38,8 +38,9 @@ final class AmqpSinkStage(settings: AmqpSinkSettings)

override protected def initialAttributes: Attributes = defaultAttributes

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with AmqpConnectorLogic {
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val promise = Promise[Done]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
Expand All @@ -48,32 +49,136 @@ final class AmqpSinkStage(settings: AmqpSinkSettings)

override def whenConnected(): Unit = {
val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex =>
promise.failure(ex)
failStage(ex)
}
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
shutdownCallback.invoke(cause)
})
channel.addShutdownListener(
new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
shutdownCallback.invoke(cause)
}
)
pull(in)
}

setHandler(in,
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
pull(in)
}
})

}
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
super.onUpstreamFailure(ex)
}

override def onUpstreamFinish(): Unit = {
promise.success(Done)
super.onUpstreamFinish()
}

override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
pull(in)
}
}
)
}, promise.future)
}

override def toString: String = "AmqpSink"
}

object AmqpReplyToStage {

private val defaultAttributes =
Attributes.name("AmqpReplyToSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the broker. This stage sends messages to
* the queue named in the replyTo options of the message instead of from settings declared at construction.
*/
final class AmqpReplyToStage(settings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]]
with AmqpConnector { stage =>
import AmqpReplyToStage._

val in = Inlet[OutgoingMessage]("AmqpReplyToSink.in")

override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in)

override protected def initialAttributes: Attributes = defaultAttributes

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val promise = Promise[Done]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings

override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)

override def whenConnected(): Unit = {
val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex =>
promise.failure(ex)
failStage(ex)
}
channel.addShutdownListener(
new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
shutdownCallback.invoke(cause)
}
)
pull(in)
}

setHandler(
in,
new InHandler {

override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
super.onUpstreamFailure(ex)
}

override def onUpstreamFinish(): Unit = {
promise.success(Done)
super.onUpstreamFinish()
}

override def onPush(): Unit = {
val elem = grab(in)

val replyTo = elem.props.map(_.getReplyTo)

if (replyTo.isDefined) {
channel.basicPublish(
"",
replyTo.get,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
} else if (settings.failIfReplyToMissing) {
val ex = new RuntimeException("Reply-to header was not set")
promise.failure(ex)
failStage(ex)
}

tryPull(in)
}
}
)
}, promise.future)
}

override def toString: String = "AmqpReplyToSink"
}
Loading

0 comments on commit 66c45fb

Please sign in to comment.