Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS flows + Embedded ElasticMQ #239

Merged
merged 6 commits into from
Apr 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ services:
image: rabbitmq:3
ports:
- "5672:5672"
sqs:
image: s12v/elasticmq:0.13.2
ports:
- "9324:9324"
mqtt:
image: toke/mosquitto
ports:
Expand Down
30 changes: 22 additions & 8 deletions docs/src/main/paradox/sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ Gradle

## Usage

Sources and Sinks provided by this connector need a prepared `AmazonSQSAsyncClient` to load messages from a queue.
Sources, Flows and Sinks provided by this connector need a prepared `AmazonSQSAsync` to load messages from a queue.

Scala
: @@snip (../../../../sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/DefaultTestContext.scala) { #init-client }

Java
: @@snip (../../../../sqs/src/test/java/akka/stream/alpakka/sqs/javadsl/SqsSourceTest.java) { #init-client }
: @@snip (../../../../sqs/src/test/java/akka/stream/alpakka/sqs/javadsl/BaseSqsTest.java) { #init-client }

We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).

Expand Down Expand Up @@ -108,7 +108,7 @@ Java
#### Sink configuration

Scala
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala) { #SqsSinkSettings }
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkSettings.scala) { #SqsSinkSettings }

Options:

Expand Down Expand Up @@ -141,21 +141,35 @@ Java (requeue)
Same as the normal `SqsSink`:

Scala
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkStage.scala) { #SqsAckSinkSettings }
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkSettings.scala) { #SqsAckSinkSettings }

Options:

- `maxInFlight` - maximum number of messages being processed by `AmazonSQSAsync` at the same time. Default: 10

### Using SQS as a Flow

You can also build flow stages which put or acknowledge messages in SQS, backpressure on queue response and then forward
responses further down the stream. The API is similar to creating Sinks.

Scala (flow)
: @@snip (../../../../sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala) { #flow }

Java (flow)
: @@snip (../../../../sqs/src/test/java/akka/stream/alpakka/sqs/javadsl/SqsSinkTest.java) { #flow }

Scala (flow with ack)
: @@snip (../../../../sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala) { #flow-ack }

Java (flow with ack)
: @@snip (../../../../sqs/src/test/java/akka/stream/alpakka/sqs/javadsl/SqsAckSinkTest.java) { #flow-ack }

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

> The test code uses [ElasticMQ](https://github.com/adamw/elasticmq) as queuing service which serves an AWS SQS
> compatible API. You can start one quickly using docker:
>
> `docker run -p 9324:9324 -d s12v/elasticmq`
> The test code uses embedded [ElasticMQ](https://github.com/adamw/elasticmq) as queuing service which serves an AWS SQS
> compatible API.

Scala
: ```
Expand Down
5 changes: 3 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ object Dependencies {

val Sqs = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.105", // ApacheV2
"org.mockito" % "mockito-core" % "2.7.17" % Test // MIT
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.109", // ApacheV2
"org.elasticmq" %% "elasticmq-rest-sqs" % "0.13.2", // ApacheV2
"org.mockito" % "mockito-core" % "2.7.17" % Test // MIT
)
)

Expand Down
142 changes: 142 additions & 0 deletions sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckFlowStage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sqs

import akka.stream.alpakka.sqs.scaladsl.AckResult
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model.{
DeleteMessageRequest,
DeleteMessageResult,
SendMessageRequest,
SendMessageResult
}
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

private[sqs] final class SqsAckFlowStage(queueUrl: String, sqsClient: AmazonSQSAsync)
extends GraphStage[FlowShape[MessageActionPair, Future[AckResult]]] {

private val in = Inlet[MessageActionPair]("messages")
private val out = Outlet[Future[AckResult]]("result")
override val shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private var inFlight = 0
private var inIsClosed = false

var completionState: Option[Try[Unit]] = None

private def handleFailure(exception: Exception): Unit = {
log.error(exception, "Client failure: {}", exception.getMessage)
inFlight -= 1
failStage(exception)
if (inFlight == 0 && inIsClosed)
checkForCompletion()
}

private def handleSend(result: SendMessageResult): Unit = {
log.debug(s"Sent message {}", result.getMessageId)
inFlight -= 1
if (inFlight == 0 && inIsClosed)
checkForCompletion()
}
private def handleDelete(request: DeleteMessageRequest): Unit = {
log.debug(s"Deleted message {}", request.getReceiptHandle)
inFlight -= 1
if (inFlight == 0 && inIsClosed)
checkForCompletion()
}

var failureCallback: AsyncCallback[Exception] = _
var sendCallback: AsyncCallback[SendMessageResult] = _
var deleteCallback: AsyncCallback[DeleteMessageRequest] = _

override def preStart(): Unit = {
super.preStart()
failureCallback = getAsyncCallback[Exception](handleFailure)
sendCallback = getAsyncCallback[SendMessageResult](handleSend)
deleteCallback = getAsyncCallback[DeleteMessageRequest](handleDelete)
}

override protected def logSource: Class[_] = classOf[SqsAckFlowStage]

def checkForCompletion() =
if (isClosed(in) && inFlight == 0) {
completionState match {
case Some(Success(_)) => completeStage()
case Some(Failure(ex)) => failStage(ex)
case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
}

setHandler(out, new OutHandler {
override def onPull() =
tryPull(in)
})

setHandler(
in,
new InHandler {

override def onUpstreamFinish() = {
inIsClosed = true
completionState = Some(Success(()))
checkForCompletion()
}

override def onUpstreamFailure(ex: Throwable) = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}

override def onPush() = {
inFlight += 1
val (message, action) = grab(in)
val responsePromise = Promise[AckResult]
action match {
case Ack() =>
sqsClient.deleteMessageAsync(
new DeleteMessageRequest(queueUrl, message.getReceiptHandle),
new AsyncHandler[DeleteMessageRequest, DeleteMessageResult] {

override def onError(exception: Exception): Unit = {
responsePromise.failure(exception)
failureCallback.invoke(exception)
}

override def onSuccess(request: DeleteMessageRequest, result: DeleteMessageResult): Unit = {
responsePromise.success(AckResult(result, message.getBody))
deleteCallback.invoke(request)
}
}
)
case RequeueWithDelay(delaySeconds) =>
sqsClient
.sendMessageAsync(
new SendMessageRequest(queueUrl, message.getBody).withDelaySeconds(delaySeconds),
new AsyncHandler[SendMessageRequest, SendMessageResult] {

override def onError(exception: Exception): Unit = {
responsePromise.failure(exception)
failureCallback.invoke(exception)
}

override def onSuccess(request: SendMessageRequest, result: SendMessageResult): Unit = {
responsePromise.success(AckResult(result, message.getBody))
sendCallback.invoke(result)
}
}
)
}
push(out, responsePromise.future)
}
}
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sqs

object SqsAckSinkSettings {
val Defaults = SqsAckSinkSettings(maxInFlight = 10)
}

//#SqsAckSinkSettings
final case class SqsAckSinkSettings(maxInFlight: Int) {
require(maxInFlight > 0)
}
//#SqsAckSinkSettings

sealed trait MessageAction
final case class Ack() extends MessageAction
final case class RequeueWithDelay(delaySeconds: Int) extends MessageAction
Loading