diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpRpcFlowStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpRpcFlowStage.scala new file mode 100644 index 0000000000..2ddd4db84e --- /dev/null +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpRpcFlowStage.scala @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.alpakka.amqp + +import akka.stream._ +import akka.stream.stage._ +import akka.util.ByteString +import com.rabbitmq.client.AMQP.BasicProperties +import com.rabbitmq.client.{DefaultConsumer, Envelope, ShutdownSignalException} + +import scala.collection.mutable +import scala.concurrent.{Future, Promise} + +object AmqpRpcFlowStage { + + private val defaultAttributes = + Attributes.name("AmqpRpcFlow").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) +} + +/** + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication + * + * @param responsesPerMessage The number of responses that should be expected for each message placed on the queue. This + * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + */ +final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1) + extends GraphStageWithMaterializedValue[FlowShape[OutgoingMessage, IncomingMessage], Future[String]] + with AmqpConnector { stage => + + import AmqpRpcFlowStage._ + + val in = Inlet[OutgoingMessage]("AmqpRpcFlow.in") + val out = Outlet[IncomingMessage]("AmqpRpcFlow.out") + + override def shape: FlowShape[OutgoingMessage, IncomingMessage] = FlowShape.of(in, out) + + override protected def initialAttributes: Attributes = defaultAttributes + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[String]) = { + val promise = Promise[String]() + (new GraphStageLogic(shape) with AmqpConnectorLogic { + + override val settings = stage.settings + private val exchange = settings.exchange.getOrElse("") + private val routingKey = settings.routingKey.getOrElse("") + private val queue = mutable.Queue[IncomingMessage]() + private var queueName: String = _ + private var outstandingMessages = 0 + + override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings) + + override def whenConnected(): Unit = { + import scala.collection.JavaConverters._ + val shutdownCallback = getAsyncCallback[Option[ShutdownSignalException]] { + case Some(ex) => + promise.tryFailure(ex) + failStage(ex) + case None => + promise.trySuccess("") + completeStage() + } + + pull(in) + + // we have only one consumer per connection so global is ok + channel.basicQos(bufferSize, true) + val consumerCallback = getAsyncCallback(handleDelivery) + + val amqpSourceConsumer = new DefaultConsumer(channel) { + override def handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: BasicProperties, + body: Array[Byte] + ): Unit = + consumerCallback.invoke(IncomingMessage(ByteString(body), envelope, properties)) + + override def handleCancel(consumerTag: String): Unit = + // non consumer initiated cancel, for example happens when the queue has been deleted. + shutdownCallback.invoke(None) + + override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = + // "Called when either the channel or the underlying connection has been shut down." + shutdownCallback.invoke(Option(sig)) + } + + // Create an exclusive queue with a randomly generated name for use as the replyTo portion of RPC + queueName = channel + .queueDeclare( + "", + false, + true, + true, + Map.empty[String, AnyRef].asJava + ) + .getQueue + + channel.basicConsume( + queueName, + amqpSourceConsumer + ) + promise.success(queueName) + } + + def handleDelivery(message: IncomingMessage): Unit = + if (isAvailable(out)) { + pushAndAckMessage(message) + } else { + if (queue.size + 1 > bufferSize) { + failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize")) + } else { + queue.enqueue(message) + } + } + + def pushAndAckMessage(message: IncomingMessage): Unit = { + push(out, message) + // ack it as soon as we have passed it downstream + // TODO ack less often and do batch acks with multiple = true would probably be more performant + channel.basicAck( + message.envelope.getDeliveryTag, + false // just this single message + ) + outstandingMessages -= 1 + + if (outstandingMessages == 0 && isClosed(in)) { + completeStage() + } + } + + setHandler( + out, + new OutHandler { + override def onPull(): Unit = + if (queue.nonEmpty) { + pushAndAckMessage(queue.dequeue()) + } + } + ) + + setHandler( + in, + new InHandler { + // We don't want to finish since we're still waiting + // on incoming messages from rabbit. However, if we + // haven't processed a message yet, we do want to complete + // so that we don't hang. + override def onUpstreamFinish(): Unit = + if (outstandingMessages == 0) super.onUpstreamFinish() + + override def onPush(): Unit = { + val elem = grab(in) + val props = elem.props.getOrElse(new BasicProperties()).builder.replyTo(queueName).build() + channel.basicPublish( + exchange, + routingKey, + elem.mandatory, + elem.immediate, + props, + elem.bytes.toArray + ) + + val expectedResponses: Int = { + val headers = props.getHeaders + if (headers == null) { + responsesPerMessage + } else { + val r = headers.get("expectedReplies") + if (r != null) { + r.asInstanceOf[Int] + } else { + responsesPerMessage + } + } + } + + outstandingMessages += expectedResponses + pull(in) + } + } + ) + }, promise.future) + } + + override def toString: String = "AmqpRpcFlow" + +} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala index ad0a01607f..8203a95af8 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala @@ -3,12 +3,15 @@ */ package akka.stream.alpakka.amqp -import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler} +import akka.Done +import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageWithMaterializedValue, InHandler} import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} import akka.util.ByteString import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client._ +import scala.concurrent.{Future, Promise} + final case class OutgoingMessage(bytes: ByteString, immediate: Boolean, mandatory: Boolean, @@ -16,11 +19,8 @@ final case class OutgoingMessage(bytes: ByteString, object AmqpSinkStage { - /** - * Internal API - */ private val defaultAttributes = - Attributes.name("AmsqpSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) + Attributes.name("AmqpSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) } /** @@ -28,7 +28,7 @@ object AmqpSinkStage { * Each materialized sink will create one connection to the broker. */ final class AmqpSinkStage(settings: AmqpSinkSettings) - extends GraphStage[SinkShape[OutgoingMessage]] + extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] with AmqpConnector { stage => import AmqpSinkStage._ @@ -38,8 +38,9 @@ final class AmqpSinkStage(settings: AmqpSinkSettings) override protected def initialAttributes: Attributes = defaultAttributes - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with AmqpConnectorLogic { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + (new GraphStageLogic(shape) with AmqpConnectorLogic { override val settings = stage.settings private val exchange = settings.exchange.getOrElse("") private val routingKey = settings.routingKey.getOrElse("") @@ -48,32 +49,143 @@ final class AmqpSinkStage(settings: AmqpSinkSettings) override def whenConnected(): Unit = { val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex => + promise.failure(ex) failStage(ex) } - channel.addShutdownListener(new ShutdownListener { - override def shutdownCompleted(cause: ShutdownSignalException): Unit = - shutdownCallback.invoke(cause) - }) + channel.addShutdownListener( + new ShutdownListener { + override def shutdownCompleted(cause: ShutdownSignalException): Unit = + shutdownCallback.invoke(cause) + } + ) pull(in) } - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - val elem = grab(in) - channel.basicPublish( - exchange, - routingKey, - elem.mandatory, - elem.immediate, - elem.props.orNull, - elem.bytes.toArray - ) - pull(in) - } - }) - } + override def onUpstreamFailure(ex: Throwable): Unit = { + promise.failure(ex) + super.onUpstreamFailure(ex) + } + + override def onUpstreamFinish(): Unit = { + promise.success(Done) + super.onUpstreamFinish() + } + + override def onPush(): Unit = { + val elem = grab(in) + channel.basicPublish( + exchange, + routingKey, + elem.mandatory, + elem.immediate, + elem.props.orNull, + elem.bytes.toArray + ) + pull(in) + } + } + ) + }, promise.future) + } override def toString: String = "AmqpSink" } + +object AmqpReplyToStage { + + private val defaultAttributes = + Attributes.name("AmqpReplyToSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) + +} + +/** + * Connects to an AMQP server upon materialization and sends incoming messages to the server. + * Each materialized sink will create one connection to the broker. This stage sends messages to + * the queue named in the replyTo options of the message instead of from settings declared at construction. + */ +final class AmqpReplyToStage(settings: AmqpReplyToSinkSettings) + extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] + with AmqpConnector { stage => + import AmqpReplyToStage._ + + val in = Inlet[OutgoingMessage]("AmqpReplyToSink.in") + + override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in) + + override protected def initialAttributes: Attributes = defaultAttributes + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + (new GraphStageLogic(shape) with AmqpConnectorLogic { + override val settings = stage.settings + + override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings) + + override def whenConnected(): Unit = { + val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex => + promise.failure(ex) + failStage(ex) + } + channel.addShutdownListener( + new ShutdownListener { + override def shutdownCompleted(cause: ShutdownSignalException): Unit = + shutdownCallback.invoke(cause) + } + ) + pull(in) + } + + override def postStop(): Unit = { + if (!promise.isCompleted) { + promise.failure(new RuntimeException("stage stopped unexpectedly")) + } + super.postStop() + } + + setHandler( + in, + new InHandler { + + override def onUpstreamFailure(ex: Throwable): Unit = { + promise.failure(ex) + super.onUpstreamFailure(ex) + } + + override def onUpstreamFinish(): Unit = { + promise.success(Done) + super.onUpstreamFinish() + } + + override def onPush(): Unit = { + val elem = grab(in) + + val replyTo = elem.props.map(_.getReplyTo) + + if (replyTo.isDefined) { + channel.basicPublish( + "", + replyTo.get, + elem.mandatory, + elem.immediate, + elem.props.orNull, + elem.bytes.toArray + ) + } else if (settings.failIfReplyToMissing) { + val ex = new RuntimeException("Reply-to header was not set") + promise.failure(ex) + failStage(ex) + } + + tryPull(in) + } + } + ) + }, promise.future) + } + + override def toString: String = "AmqpReplyToSink" +} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala new file mode 100644 index 0000000000..02591e1631 --- /dev/null +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.alpakka.amqp.javadsl + +import java.util.concurrent.CompletionStage + +import akka.stream.alpakka.amqp._ +import akka.stream.javadsl.Flow +import akka.util.ByteString + +import scala.compat.java8.FutureConverters._ + +object AmqpRpcFlow { + + /** + * Java API: + * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating + * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. + * + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. + */ + def create(settings: AmqpSinkSettings, + bufferSize: Int + ): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] = + akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow(settings, bufferSize).mapMaterializedValue(f => f.toJava).asJava + + /** + * Java API: + * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating + * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. + * + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. + * + * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This + * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + */ + def create(settings: AmqpSinkSettings, + bufferSize: Int, + repliesPerMessage: Int + ): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] = + akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow(settings, bufferSize, repliesPerMessage) + .mapMaterializedValue(f => f.toJava).asJava + + /** + * Java API: + * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating + * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. + * + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. + * + * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This + * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + */ + def createSimple(settings: AmqpSinkSettings, + repliesPerMessage: Int + ): Flow[ByteString, ByteString, CompletionStage[String]] = + akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow.simple(settings, repliesPerMessage) + .mapMaterializedValue(f => f.toJava).asJava + +} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala index ddcb9d5bbb..366a771926 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala @@ -3,9 +3,11 @@ */ package akka.stream.alpakka.amqp.javadsl -import akka.NotUsed -import akka.stream.alpakka.amqp.{AmqpSinkSettings, AmqpSinkStage, OutgoingMessage} -import akka.stream.javadsl.Sink +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ +import akka.Done +import akka.stream.alpakka.amqp._ import akka.util.ByteString object AmqpSink { @@ -13,13 +15,24 @@ object AmqpSink { /** * Java API: Creates an [[AmqpSink]] that accepts [[OutgoingMessage]] elements. */ - def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, NotUsed] = - Sink.fromGraph(new AmqpSinkStage(settings)) + def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, CompletionStage[Done]] = + akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava + + /** + * Java API: + * + * Connects to an AMQP server upon materialization and sends incoming messages to the server. + * Each materialized sink will create one connection to the broker. This stage sends messages to + * the queue named in the replyTo options of the message instead of from settings declared at construction. + */ + def createReplyTo( + settings: AmqpReplyToSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, CompletionStage[Done]] = + akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.toJava).asJava /** * Java API: Creates an [[AmqpSink]] that accepts ByteString elements. */ - def createSimple(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[ByteString, NotUsed] = - akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).asJava + def createSimple(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] = + akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala index 75e0cc28b3..5102b594ca 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala @@ -71,6 +71,29 @@ object TemporaryQueueSourceSettings { TemporaryQueueSourceSettings(connectionSettings, exchange) } +final case class AmqpReplyToSinkSettings( + connectionSettings: AmqpConnectionSettings, + failIfReplyToMissing: Boolean = true +) extends AmqpConnectorSettings { + override final val declarations = Nil +} + +object AmqpReplyToSinkSettings { + + /** + * Java API + */ + def create(connectionSettings: AmqpConnectionSettings) = + AmqpReplyToSinkSettings(connectionSettings) + + /** + * Java API + */ + def create(connectionSettings: AmqpConnectionSettings, failIfReplyToMissing: Boolean) = + AmqpReplyToSinkSettings(connectionSettings, failIfReplyToMissing) + +} + final case class AmqpSinkSettings( connectionSettings: AmqpConnectionSettings, exchange: Option[String] = None, diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala new file mode 100644 index 0000000000..827b79a40b --- /dev/null +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.alpakka.amqp.scaladsl + +import akka.stream.alpakka.amqp.{AmqpRpcFlowStage, AmqpSinkSettings, IncomingMessage, OutgoingMessage} +import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.util.ByteString + +import scala.concurrent.Future + +object AmqpRpcFlow { + + /** + * Scala API: + * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating + * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. + * + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. + * + * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This + * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + */ + def simple(settings: AmqpSinkSettings, repliesPerMessage: Int = 1): Flow[ByteString, ByteString, Future[String]] = + Flow[ByteString] + .map(bytes => OutgoingMessage(bytes, false, false, None)) + .viaMat(apply(settings, 1, repliesPerMessage))(Keep.right) + .map(_.bytes) + + /** + * Scala API: + * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating + * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. + * + * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. + * + * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This + * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + */ + def apply(settings: AmqpSinkSettings, + bufferSize: Int, + repliesPerMessage: Int = 1): Flow[OutgoingMessage, IncomingMessage, Future[String]] = + Flow.fromGraph(new AmqpRpcFlowStage(settings, bufferSize, repliesPerMessage)) + +} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala index 28f544b6e3..921b8ae318 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala @@ -3,23 +3,33 @@ */ package akka.stream.alpakka.amqp.scaladsl -import akka.NotUsed -import akka.stream.alpakka.amqp.{AmqpSinkSettings, AmqpSinkStage, OutgoingMessage} +import akka.{Done, NotUsed} +import akka.stream.alpakka.amqp._ import akka.stream.scaladsl.Sink import akka.util.ByteString +import scala.concurrent.Future + object AmqpSink { /** * Scala API: Creates an [[AmqpSink]] that accepts ByteString elements. */ - def simple(settings: AmqpSinkSettings): Sink[ByteString, NotUsed] = + def simple(settings: AmqpSinkSettings): Sink[ByteString, Future[Done]] = apply(settings).contramap[ByteString](bytes => OutgoingMessage(bytes, false, false, None)) + def replyTo(settings: AmqpReplyToSinkSettings): Sink[OutgoingMessage, Future[Done]] = + Sink.fromGraph(new AmqpReplyToStage(settings)) + /** - * Scala API: Creates an [[AmqpSink]] that accepts [[OutgoingMessage]] elements. + * Scala API: + * + * Connects to an AMQP server upon materialization and sends incoming messages to the server. + * Each materialized sink will create one connection to the broker. This stage sends messages to + * the queue named in the replyTo options of the message instead of from settings declared at construction. + * */ - def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, NotUsed] = + def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, Future[Done]] = Sink.fromGraph(new AmqpSinkStage(settings)) } diff --git a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java index 80a8d0147c..fd3753c013 100644 --- a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java +++ b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java @@ -4,6 +4,8 @@ package akka.stream.alpakka.amqp.javadsl; import akka.stream.alpakka.amqp.*; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.javadsl.TestSink; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -18,6 +20,7 @@ import akka.stream.javadsl.*; import akka.testkit.JavaTestKit; import akka.util.ByteString; +import scala.Some; import scala.concurrent.duration.Duration; import java.util.*; @@ -51,7 +54,7 @@ public void publishAndConsume() throws Exception { //#queue-declaration //#create-sink - final Sink amqpSink = AmqpSink.createSimple( + final Sink> amqpSink = AmqpSink.createSimple( AmqpSinkSettings.create(DefaultAmqpConnection.getInstance()) .withRoutingKey(queueName) .withDeclarations(queueDeclaration) @@ -82,6 +85,58 @@ public void publishAndConsume() throws Exception { assertEquals(input, result.toCompletableFuture().get(3, TimeUnit.SECONDS)); } + + @Test + public void publishAndConsumeRpc() throws Exception { + + final String queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis(); + final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName); + + //#create-rpc-flow + final Flow> ampqRpcFlow = AmqpRpcFlow.createSimple( + AmqpSinkSettings.create(DefaultAmqpConnection.getInstance()).withRoutingKey(queueName).withDeclarations(queueDeclaration), 1); + //#create-rpc-flow + + final Integer bufferSize = 10; + final Source amqpSource = AmqpSource.create( + NamedQueueSourceSettings.create( + DefaultAmqpConnection.getInstance(), + queueName + ).withDeclarations(queueDeclaration), + bufferSize + ); + + //#run-rpc-flow + final List input = Arrays.asList("one", "two", "three", "four", "five"); + TestSubscriber.Probe probe = + Source.from(input) + .map(ByteString::fromString) + .via(ampqRpcFlow) + .runWith(TestSink.probe(system), materializer); + //#run-rpc-flow + + Sink> amqpSink = AmqpSink.createReplyTo( + AmqpReplyToSinkSettings.create(DefaultAmqpConnection.getInstance()) + ); + + amqpSource.map(b -> + new OutgoingMessage(b.bytes().concat(ByteString.fromString("a")), false, false, Some.apply(b.properties())) + ).runWith(amqpSink, materializer); + + probe.request(5) + .expectNextUnordered( + ByteString.fromString("onea"), + ByteString.fromString("twoa"), + ByteString.fromString("threea"), + ByteString.fromString("foura"), + ByteString.fromString("fivea") + ).expectComplete(); + + final CompletionStage> result = + amqpSource.map(m -> m.bytes().utf8String()).take(input.size()).runWith(Sink.seq(), materializer); + + } + @Test public void publishFanoutAndConsume() throws Exception { //#exchange-declaration @@ -90,7 +145,7 @@ public void publishFanoutAndConsume() throws Exception { //#exchange-declaration //#create-exchange-sink - final Sink amqpSink = AmqpSink.createSimple( + final Sink> amqpSink = AmqpSink.createSimple( AmqpSinkSettings.create(DefaultAmqpConnection.getInstance()) .withExchange(exchangeName) .withDeclarations(exchangeDeclaration) diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala index cccb0db612..a593cf86d6 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.amqp import akka.actor.ActorSystem import akka.stream.ActorMaterializer import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import org.scalatest.{AsyncWordSpec, BeforeAndAfterAll, Matchers, WordSpec} abstract class AmqpSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala index 8464a31678..a0458476c0 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -6,7 +6,8 @@ package akka.stream.alpakka.amqp.scaladsl import akka.Done import akka.stream._ import akka.stream.alpakka.amqp._ -import akka.stream.scaladsl.{GraphDSL, Merge, Sink, Source} +import akka.stream.scaladsl.{GraphDSL, Keep, Merge, Sink, Source} +import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{TestPublisher, TestSubscriber} import akka.util.ByteString @@ -53,6 +54,110 @@ class AmqpConnectorsSpec extends AmqpSpec { result.futureValue shouldEqual input } + "publish via RPC and then consume through a simple queue again in the same JVM" in { + + val queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis() + val queueDeclaration = QueueDeclaration(queueName) + + //#create-rpc-flow + val amqpRpcFlow = AmqpRpcFlow.simple( + AmqpSinkSettings(DefaultAmqpConnection).withRoutingKey(queueName).withDeclarations(queueDeclaration) + ) + //#create-rpc-flow + + val amqpSource = AmqpSource( + NamedQueueSourceSettings(DefaultAmqpConnection, queueName), + bufferSize = 1 + ) + + val input = Vector("one", "two", "three", "four", "five") + //#run-rpc-flow + val (rpcQueueF, probe) = + Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run + //#run-rpc-flow + val rpqCqueue = rpcQueueF.futureValue + + val amqpSink = AmqpSink.replyTo( + AmqpReplyToSinkSettings(DefaultAmqpConnection) + ) + + amqpSource + .map(b => OutgoingMessage(b.bytes.concat(ByteString("a")), false, false, Some(b.properties))) + .runWith(amqpSink) + + probe.request(5).expectNextUnorderedN(input.map(s => ByteString(s.concat("a")))).expectComplete() + } + + "publish via RPC which expects 2 responses per message and then consume through a simple queue again in the same JVM" in { + val queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis() + val queueDeclaration = QueueDeclaration(queueName) + + val amqpRpcFlow = AmqpRpcFlow.simple( + AmqpSinkSettings(DefaultAmqpConnection).withRoutingKey(queueName).withDeclarations(queueDeclaration), + 2 + ) + + val amqpSource = AmqpSource( + NamedQueueSourceSettings(DefaultAmqpConnection, queueName), + bufferSize = 1 + ) + + val input = Vector("one", "two", "three", "four", "five") + val (rpcQueueF, probe) = + Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run + val rpqCqueue = rpcQueueF.futureValue + + val amqpSink = AmqpSink.replyTo( + AmqpReplyToSinkSettings(DefaultAmqpConnection) + ) + + amqpSource.mapConcat { b => + List( + OutgoingMessage(b.bytes.concat(ByteString("a")), false, false, Some(b.properties)), + OutgoingMessage(b.bytes.concat(ByteString("aa")), false, false, Some(b.properties)) + ) + }.runWith(amqpSink) + + probe + .request(10) + .expectNextUnorderedN(input.flatMap(s => List(ByteString(s.concat("a")), ByteString(s.concat("aa"))))) + .expectComplete() + } + + "correctly close a AmqpRpcFlow when stream is closed without passing any elements" in { + + Source + .empty[ByteString] + .via(AmqpRpcFlow.simple(AmqpSinkSettings(DefaultAmqpConnection))) + .runWith(TestSink.probe) + .ensureSubscription() + .expectComplete() + + } + + "handle missing reply-to header correctly" in { + + val outgoingMessage = OutgoingMessage(ByteString.empty, false, false, None) + + Source + .single(outgoingMessage) + .watchTermination()(Keep.right) + .to(AmqpSink.replyTo(AmqpReplyToSinkSettings(DefaultAmqpConnection))) + .run() + .futureValue shouldBe akka.Done + + val caught = intercept[RuntimeException] { + Source + .single(outgoingMessage) + .toMat(AmqpSink.replyTo(AmqpReplyToSinkSettings(DefaultAmqpConnection, true)))(Keep.right) + .run() + .futureValue + } + + caught.getCause.getMessage should equal("Reply-to header was not set") + + } + "publish from one source and consume elements with multiple sinks" in { val queueName = "amqp-conn-it-spec-work-queues-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName) @@ -131,6 +236,7 @@ class AmqpConnectorsSpec extends AmqpSpec { subscriber.cancel() publisher.sendComplete() + succeed } "not ack messages unless they get consumed" in { @@ -188,6 +294,7 @@ class AmqpConnectorsSpec extends AmqpSpec { subscriber2.expectNext().bytes.utf8String shouldEqual "five" subscriber2.cancel() + succeed } "pub-sub from one source with multiple sinks" in { diff --git a/docs/src/main/paradox/amqp.md b/docs/src/main/paradox/amqp.md index 8479f2e18f..d1c78f7b98 100644 --- a/docs/src/main/paradox/amqp.md +++ b/docs/src/main/paradox/amqp.md @@ -115,6 +115,23 @@ We merge all sources into one and add the index of the source to all incoming me Such sink and source can be started the same way as in the previous example. +### Using rabbitmq as an RPC mechanism + +If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow [RabbitMQ RPC](https://www.rabbitmq.com/tutorials/tutorial-six-java.html) + +Scala +: @@snip (../../../../amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala) { #create-rpc-flow } + +Java +: @@snip (../../../../amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java) { #create-rpc-flow } + + +Scala +: @@snip (../../../../amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala) { #run-rpc-flow } + +Java +: @@snip (../../../../amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java) { #run-rpc-flow } + ### Running the example code The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.