Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SNS connector with publish sink #204 #205

Merged
merged 13 commits into from
Apr 13, 2017
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val alpakka = project
.in(file("."))
.enablePlugins(PublishUnidoc)
.aggregate(amqp, cassandra, dynamodb, files, ftp, hbase, jms, mqtt, s3, simpleCodecs, sqs, sse, awslambda)
.aggregate(amqp, cassandra, dynamodb, files, ftp, hbase, jms, mqtt, s3, simpleCodecs, sns, sqs, sse, awslambda)

lazy val amqp = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -77,6 +77,13 @@ lazy val simpleCodecs = project
name := "akka-stream-alpakka-simple-codecs"
)

lazy val sns = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-sns",
Dependencies.Sns
)

lazy val sqs = project
.in(file("sqs"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [AMQP Connector](amqp.md)
* [AWS DynamoDB Connector](dynamodb.md)
* [AWS SNS Connector](sns.md)
* [AWS SQS Connector](sqs.md)
* [AWS Lambda Connector](awslambda.md)
* [Cassandra Connector](cassandra.md)
Expand Down
93 changes: 93 additions & 0 deletions docs/src/main/paradox/sns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# AWS SNS Connector

The AWS SNS connector provides an Akka Stream Flow and Sink for push notifications through AWS SNS.

For more information about AWS SNS please visit the [official documentation](https://aws.amazon.com/documentation/sns/).

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sns" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-sns_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-sns_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

Sources provided by this connector need a prepared `AmazonSNSAsyncClient` to publish messages to a topic.

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-client }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-client }

We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-system }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-system }

This is all preparation that we are going to need.

### Publish messages to a SNS topic

Now we can publish a String message to any SNS topic where we have access to by providing the topic ARN to the
@scaladoc[SnsPublishFlow](akka.stream.alpakka.sns.scaladsl.SnsPublishFlow$) or
@scaladoc[SnsPublishSink](akka.stream.alpakka.sns.scaladsl.SnsPublishSink$) factory.

### SnsPublishFlow

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #use-flow }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #use-flow }

As you can see, this would publish the messages from the source to the specified AWS SNS topic.
After a message has been successfully published, a
[PublishResult](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sns/model/PublishResult.html)
will be pushed downstream.

### SnsPublishSink

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #use-sink }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #use-sink }

As you can see, this would publish the messages from the source to the specified AWS SNS topic.

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> sns/test
```
7 changes: 7 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ object Dependencies {
)
)

val Sns = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.95", // ApacheV2
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)

val Sqs = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.76", // ApacheV2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns

import akka.stream._
import akka.stream.stage._
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal API?
private[akka]

extends GraphStage[FlowShape[String, PublishResult]] {

private val in = Inlet[String]("SnsPublishFlow.in")
private val out = Outlet[PublishResult]("SnsPublishFlow.out")

override def shape: FlowShape[String, PublishResult] = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {

private val failureCallback = getAsyncCallback[Throwable](handleFailure)
private val successCallback = getAsyncCallback[PublishResult](handleSuccess)

private def handleFailure(ex: Throwable): Unit =
failStage(ex)

private def handleSuccess(result: PublishResult): Unit = {
log.debug("Published SNS message: {}", result.getMessageId)

if (isAvailable(out)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will it not be available? should this be !isClosed
we are just dropping the message if it's not available so I want to be sure that we don't do that accidentally

if (!hasBeenPulled(in)) tryPull(in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pull here? shouldn't that only be done from onPull?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this limit the Flow to process elements only one by one? Otherwise we would always have to wait for the output port to pull?

Thats why I also made inFlight as an Int and not a Boolean.

push(out, result)
}
}

override def onPush(): Unit = {
val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in))

snsClient.publishAsync(request,
new AsyncHandler[PublishRequest, PublishResult] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily anything to act on but, as this gets the values passed in on success/failure, it could be a single instance of AsyncHandler that you reuse rather than a new instance for every message.

override def onError(exception: Exception): Unit =
failureCallback.invoke(exception)

override def onSuccess(request: PublishRequest, result: PublishResult): Unit =
successCallback.invoke(result)
})
}

override def onPull(): Unit = {
if (isClosed(in)) completeStage()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be an ongoing publish that did not yet complete here, even if in was closed, you need to keep track of that and emit that last publish result when it comes, and then complete the stage.

if (!hasBeenPulled(in)) tryPull(in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else if

}

setHandlers(in, out, this)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns

import akka.Done
import akka.stream.stage._
import akka.stream.{Attributes, Inlet, SinkShape}
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

import scala.concurrent.{Future, Promise}

final class SnsPublishSinkStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the AmazonSNSAsync thread safe? A stage can be materialized any number of times and all of them will share the same snsClient. If it cannot be shared a factory lambda is better (() => AmazonSNSAsync), that way every materialization can have its own client (which it will be responsible to manage/close etc in sync with the stage lifecycle).

Another option might be to take a AmazonSNSAsyncClientBuilder and .build() that each materialization (if the builder is immutable that is).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional thought: It could be better to make this a flow, emitting the values downstream as they have been ack:ed from AWS, the factory for the simple use case as sink could then be achieved through composition (SNSPublishFlow.to(Sink.ignore)).

Copy link
Contributor Author

@fgrutsch fgrutsch Mar 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the AmazonSNSAsync is ThreadSafe regarding to the AWS docs. That's why I think it is fine to share a single AmazonSNSAsync among multiple materialization stages.

Also regarding your last comment about not making the AmazonSNSAsync implicit, I try to be consistent with other connectors, like the SQS one. There we also pass the AmazonSQSClient implicitly. I think it will be easier for people who use the connectors if the initialization of the GraphStages work the same way.

Good point to change the Sink to a Flow. I thought about keeping the Sink and provide an additional Flow GraphStage which will return the messageId of the published message, so clients can do further processing with the returned messageId. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Hadn't noticed that how many connectors that do that. Thats a valid argument to keep it implicit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for verifying that the client is threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe even just let it emit PublishResult downstream and then both String => String, and String => MessageId can be achieved through combinations with other stages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the SnsPublishFlow to emit a PublishResult.

Copy link
Member

@johanandren johanandren Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we could delete the sink stage and express it by using the flow in the factory method instead:

val flow: Flow[String, PublishResult, NotUsed] 
val sink: Sink[String, Future[Done]] = flow.toMat(Sink.ignore)(Keep.right)

extends GraphStageWithMaterializedValue[SinkShape[String], Future[Done]] {

private val in = Inlet[String]("SnsPublishSink.in")

override def shape: SinkShape[String] = SinkShape.of(in)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val completed = Promise[Done]()

val logic = new GraphStageLogic(shape) with InHandler with StageLogging {

private def handleFailure(ex: Throwable): Unit = {
failStage(ex)
completed.tryFailure(ex)
}

private def handleSuccess(result: PublishResult): Unit = {
log.debug("Published SNS message: {}", result.getMessageId)
completed.trySuccess(Done)

if (!hasBeenPulled(in)) tryPull(in)
}

private val failureCallback = getAsyncCallback[Throwable](handleFailure)
private val successCallback = getAsyncCallback[PublishResult](handleSuccess)

override def onPush(): Unit = {
val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in))

snsClient.publishAsync(request,
new AsyncHandler[PublishRequest, PublishResult] {
override def onError(exception: Exception): Unit =
failureCallback.invoke(exception)

override def onSuccess(request: PublishRequest, result: PublishResult): Unit =
successCallback.invoke(result)
})
}

override def onUpstreamFinish(): Unit = {
completeStage()
completed.trySuccess(Done)
}

override def onUpstreamFailure(ex: Throwable): Unit = {
failStage(ex)
completed.tryFailure(ex)
}

override def preStart(): Unit = {
setKeepGoing(true)
pull(in)
}

setHandler(in, this)
}

(logic, completed.future)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.javadsl

import akka.NotUsed
import akka.stream.alpakka.sns.SnsPublishFlowStage
import akka.stream.javadsl.Flow
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.PublishResult

object SnsPublishFlow {

/**
* Java API: creates a [[SnsPublishFlowStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def create(topicArn: String, snsClient: AmazonSNSAsync): Flow[String, PublishResult, NotUsed] =
Flow.fromGraph(new SnsPublishFlowStage(topicArn, snsClient))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.javadsl

import java.util.concurrent.CompletionStage

import akka.Done
import akka.stream.alpakka.sns.SnsPublishSinkStage
import akka.stream.javadsl.{Sink => JSink}
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sns.AmazonSNSAsync

import scala.compat.java8.FutureConverters._

object SnsPublishSink {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should then instead be SnsPublisher and have both .sink and .flow in the same place?


/**
* Java API: creates a [[SnsPublishSinkStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def create(topicArn: String, snsClient: AmazonSNSAsync): JSink[String, CompletionStage[Done]] = {
val sink = Sink.fromGraph(new SnsPublishSinkStage(topicArn, snsClient))
sink.mapMaterializedValue(_.toJava).asJava
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.scaladsl

import akka.NotUsed
import akka.stream.alpakka.sns.SnsPublishFlowStage
import akka.stream.scaladsl.Flow
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.PublishResult

object SnsPublishFlow {

/**
* Scala API: creates a [[SnsPublishFlowStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def apply(topicArn: String)(implicit snsClient: AmazonSNSAsync): Flow[String, PublishResult, NotUsed] =
Flow.fromGraph(new SnsPublishFlowStage(topicArn, snsClient))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.scaladsl

import akka.Done
import akka.stream.alpakka.sns.SnsPublishSinkStage
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sns.AmazonSNSAsync

import scala.concurrent.Future

object SnsPublishSink {

/**
* Scala API: creates a [[SnsPublishSinkStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def apply(topicArn: String)(implicit snsClient: AmazonSNSAsync): Sink[String, Future[Done]] =
Sink.fromGraph(new SnsPublishSinkStage(topicArn, snsClient))

}
Loading