Skip to content

Commit

Permalink
add SNS connector with publish sink akka#204
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabian Grutsch committed Feb 25, 2017
1 parent a7872e0 commit 4280680
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 1 deletion.
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val alpakka = project
.in(file("."))
.enablePlugins(PublishUnidoc)
.aggregate(amqp, cassandra, dynamodb, files, ftp, hbase, jms, mqtt, s3, simpleCodecs, sqs, sse, awslambda)
.aggregate(amqp, cassandra, dynamodb, files, ftp, hbase, jms, mqtt, s3, simpleCodecs, sns, sqs, sse, awslambda)

lazy val amqp = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -77,6 +77,14 @@ lazy val simpleCodecs = project
name := "akka-stream-alpakka-simple-codecs"
)

lazy val sns = project
.in(file("sns"))
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-sns",
Dependencies.Sns
)

lazy val sqs = project
.in(file("sqs"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [AMQP Connector](amqp.md)
* [AWS DynamoDB Connector](dynamodb.md)
* [AWS SNS Connector](sns.md)
* [AWS SQS Connector](sqs.md)
* [AWS Lambda Connector](awslambda.md)
* [Cassandra Connector](cassandra.md)
Expand Down
77 changes: 77 additions & 0 deletions docs/src/main/paradox/sns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# AWS SNS Connector

The AWS SNS connector provides an Akka Stream sink for AWS SNS.

For more information about AWS SNS please visit the [official documentation](https://aws.amazon.com/documentation/sns/).

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sns" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-sns_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-sns_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

Sources provided by this connector need a prepared `AmazonSNSAsyncClient` to publish messages to a topic.

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-client }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-client }

We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-system }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-system }

This is all preparation that we are going to need.

### Publish messages to a SNS topic

Now we can publish a String message to any SNS topic where we have access to by providing the topic ARN to the
@scaladoc[SnsPublishSink](akka.stream.alpakka.sns.scaladsl.SnsPublishSink) factory.

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #use-sink }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #use-sink }

As you can see, this would publish the messages from the source to the specified AWS SNS topic.

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> sns/test
```
7 changes: 7 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ object Dependencies {
)
)

val Sns = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.95", // ApacheV2
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)

val Sqs = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.76", // ApacheV2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package akka.stream.alpakka.sns

import akka.Done
import akka.stream.stage._
import akka.stream.{Attributes, Inlet, SinkShape}
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

import scala.concurrent.{Future, Promise}

final class SnsPublishSinkStage(topicArn: String, snsClient: AmazonSNSAsync)
extends GraphStageWithMaterializedValue[SinkShape[String], Future[Done]] {

private val in = Inlet[String]("SnsPublishSink.in")

override def shape: SinkShape[String] = SinkShape.of(in)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val completed = Promise[Done]()

val logic = new GraphStageLogic(shape) with StageLogging {

private val failureCallback = getAsyncCallback[Throwable](handleFailure)
private val successCallback = getAsyncCallback[PublishResult](handleSuccess)

setHandler(in,
new InHandler {
override def onPush(): Unit = {
val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in))

snsClient.publishAsync(request,
new AsyncHandler[PublishRequest, PublishResult] {
override def onError(exception: Exception): Unit =
failureCallback.invoke(exception)

override def onSuccess(request: PublishRequest, result: PublishResult): Unit =
successCallback.invoke(result)
})
}

override def onUpstreamFinish(): Unit = {
completeStage()
completed.trySuccess(Done)
}

override def onUpstreamFailure(ex: Throwable): Unit = {
log.error(ex, "Upstream failure: {}", ex.getMessage)
failStage(ex)
completed.tryFailure(ex)
}
})

override def preStart(): Unit = {
setKeepGoing(true)
pull(in)
}

def handleFailure(ex: Throwable): Unit = {
log.error(ex, "AmazonSNSAsync failure: {}", ex.getMessage)
failStage(ex)
completed.tryFailure(ex)
}

def handleSuccess(result: PublishResult): Unit = {
log.debug("Published SNS message: {}", result.getMessageId)
completed.trySuccess(Done)

if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
}
}

(logic, completed.future)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package akka.stream.alpakka.sns.javadsl

import java.util.concurrent.CompletionStage

import akka.Done
import akka.stream.alpakka.sns.SnsPublishSinkStage
import akka.stream.javadsl.{Sink => JSink}
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sns.AmazonSNSAsync

import scala.compat.java8.FutureConverters._

object SnsPublishSink {

/**
* Java API: creates a [[SnsPublishSinkStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def create(topicArn: String, snsClient: AmazonSNSAsync): JSink[String, CompletionStage[Done]] = {
val sink = Sink.fromGraph(new SnsPublishSinkStage(topicArn, snsClient))
sink.mapMaterializedValue(_.toJava).asJava
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package akka.stream.alpakka.sns.scaladsl

import akka.Done
import akka.stream.alpakka.sns.SnsPublishSinkStage
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sns.AmazonSNSAsync

import scala.concurrent.Future

object SnsPublishSink {

/**
* Scala API: creates a [[SnsPublishSinkStage]] for a SNS topic using an [[AmazonSNSAsync]]
*/
def apply(topicArn: String)(implicit snsClient: AmazonSNSAsync): Sink[String, Future[Done]] =
Sink.fromGraph(new SnsPublishSinkStage(topicArn, snsClient))

}
30 changes: 30 additions & 0 deletions sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package akka.stream.alpakka.sns.javadsl;

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder;

import java.util.concurrent.CompletionStage;

public class Examples {

//#init-client
BasicAWSCredentials credentials = new BasicAWSCredentials("x", "x");
AmazonSNSAsync snsClient = AmazonSNSAsyncClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).build();
//#init-client

//#init-system
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
//#init-system

//#use-sink
CompletionStage<Done> done = Source.single("message").runWith(SnsPublishSink.create("topic-arn", snsClient), materializer);
//#use-sink

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.stream.alpakka.sns

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.concurrent.Await
import scala.concurrent.duration._

trait DefaultTestContext extends BeforeAndAfterAll with MockitoSugar { this: Suite =>

implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()

override protected def afterAll(): Unit =
Await.ready(system.terminate(), 5.seconds)

}
Loading

0 comments on commit 4280680

Please sign in to comment.