Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SNS connector with publish sink #204 #205

Merged
merged 13 commits into from
Apr 13, 2017
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val alpakka = project
.in(file("."))
.enablePlugins(PublishUnidoc)
.aggregate(amqp, awslambda, cassandra, csv, dynamodb, files, ftp, googleCloudPubSub, hbase, jms, mqtt, s3, simpleCodecs, sqs, sse)
.aggregate(amqp, awslambda, cassandra, csv, dynamodb, files, ftp, googleCloudPubSub, hbase, jms, mqtt, s3, simpleCodecs, sns, sqs, sse)

lazy val amqp = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -99,6 +99,13 @@ lazy val simpleCodecs = project
name := "akka-stream-alpakka-simple-codecs"
)

lazy val sns = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-sns",
Dependencies.Sns
)

lazy val sqs = project
.in(file("sqs"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [AWS DynamoDB Connector](dynamodb.md)
* [AWS Lambda Connector](awslambda.md)
* [AWS S3 Connector](s3.md)
* [AWS SNS Connector](sns.md)
* [AWS SQS Connector](sqs.md)
* [Cassandra Connector](cassandra.md)
* [File Connectors](file.md)
Expand Down
92 changes: 92 additions & 0 deletions docs/src/main/paradox/sns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# AWS SNS Connector

The AWS SNS connector provides an Akka Stream Flow and Sink for push notifications through AWS SNS.

For more information about AWS SNS please visit the [official documentation](https://aws.amazon.com/documentation/sns/).

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sns" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-sns_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-sns_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

Sources provided by this connector need a prepared `AmazonSNSAsyncClient` to publish messages to a topic.

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-client }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-client }

We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #init-system }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #init-system }

This is all preparation that we are going to need.

### Publish messages to a SNS topic

Now we can publish a String message to any SNS topic where we have access to by providing the topic ARN to the
@scaladoc[SnsPublisher](akka.stream.alpakka.sns.scaladsl.SnsPublisher$) Flow or Sink factory method.

### Using a Flow

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #use-flow }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #use-flow }

As you can see, this would publish the messages from the source to the specified AWS SNS topic.
After a message has been successfully published, a
[PublishResult](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sns/model/PublishResult.html)
will be pushed downstream.

### Using a Sink

Scala
: @@snip (../../../../sns/src/test/scala/akka/stream/alpakka/sns/scaladsl/Examples.scala) { #use-sink }

Java
: @@snip (../../../../sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java) { #use-sink }

As you can see, this would publish the messages from the source to the specified AWS SNS topic.

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> sns/test
```
7 changes: 7 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ object Dependencies {
)
)

val Sns = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.95", // ApacheV2
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)

val Sqs = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.105", // ApacheV2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns

import akka.stream._
import akka.stream.stage._
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

private[akka] final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the client have a lifecycle, is is thread safe, should this be a client factory instead (() => AmazonSNSAsync) and have its lifecycle managed by the stage (`client.close() when the stage stops)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I already asked that previously but github hid all comments and I had forgot.

extends GraphStage[FlowShape[String, PublishResult]] {

private val in = Inlet[String]("SnsPublishFlow.in")
private val out = Outlet[PublishResult]("SnsPublishFlow.out")

override def shape: FlowShape[String, PublishResult] = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {

private var isMessageInFlight = false
private val failureCallback = getAsyncCallback[Throwable](handleFailure)
private val successCallback = getAsyncCallback[PublishResult](handleSuccess)

private def handleFailure(ex: Throwable): Unit =
failStage(ex)

private def handleSuccess(result: PublishResult): Unit = {
log.debug("Published SNS message: {}", result.getMessageId)
isMessageInFlight = false
if (!isClosed(out)) push(out, result)
}

private val asyncHandler = new AsyncHandler[PublishRequest, PublishResult] {
override def onError(exception: Exception): Unit =
failureCallback.invoke(exception)
override def onSuccess(request: PublishRequest, result: PublishResult): Unit =
successCallback.invoke(result)
}

override def onPush(): Unit = {
isMessageInFlight = true
val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in))
snsClient.publishAsync(request, asyncHandler)
}

override def onPull(): Unit = {
if (isClosed(in) && !isMessageInFlight) completeStage()
if (!hasBeenPulled(in)) tryPull(in)
}

override def onUpstreamFinish(): Unit =
if (!isMessageInFlight) completeStage()

setHandlers(in, out, this)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.javadsl

import java.util.concurrent.CompletionStage

import akka.stream.alpakka.sns.SnsPublishFlowStage
import akka.stream.javadsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.PublishResult

object SnsPublisher {

/**
* Java API: creates a [[Flow]] to publish messages to a SNS topic using an [[AmazonSNSAsync]]
*/
def createFlow(topicArn: String, snsClient: AmazonSNSAsync): Flow[String, PublishResult, NotUsed] =
Flow.fromGraph(new SnsPublishFlowStage(topicArn, snsClient))

/**
* Java API: creates a [[Sink]] to publish messages to a SNS topic using an [[AmazonSNSAsync]]
*/
def createSink(topicArn: String, snsClient: AmazonSNSAsync): Sink[String, CompletionStage[Done]] =
createFlow(topicArn, snsClient).toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]])

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.scaladsl

import akka.stream.alpakka.sns.SnsPublishFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.PublishResult

import scala.concurrent.Future

object SnsPublisher {

/**
* Scala API: creates a [[Sink]] to publish messages to a SNS topic using an [[AmazonSNSAsync]]
*/
def flow(topicArn: String)(implicit snsClient: AmazonSNSAsync): Flow[String, PublishResult, NotUsed] =
Flow.fromGraph(new SnsPublishFlowStage(topicArn, snsClient))

/**
* Scala API: creates a [[Sink]] to publish messages to a SNS topic using an [[AmazonSNSAsync]]
*/
def sink(topicArn: String)(implicit snsClient: AmazonSNSAsync): Sink[String, Future[Done]] =
flow(topicArn).toMat(Sink.ignore)(Keep.right)

}
38 changes: 38 additions & 0 deletions sns/src/test/java/akka/stream/alpakka/sns/javadsl/Examples.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns.javadsl;

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder;

import java.util.concurrent.CompletionStage;

public class Examples {

//#init-client
BasicAWSCredentials credentials = new BasicAWSCredentials("x", "x");
AmazonSNSAsync snsClient = AmazonSNSAsyncClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).build();
//#init-client

//#init-system
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
//#init-system

//#use-sink
CompletionStage<Done> sink = Source.single("message").runWith(SnsPublisher.createSink("topic-arn", snsClient), materializer);
//#use-sink

//#use-flow
CompletionStage<Done> flow = Source.single("message").via(SnsPublisher.createFlow("topic-arn", snsClient)).runWith(Sink.ignore(), materializer);
//#use-flow

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sns

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import com.amazonaws.services.sns.AmazonSNSAsyncClient
import org.mockito.Mockito.reset
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}

import scala.concurrent.Await
import scala.concurrent.duration._

trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach with MockitoSugar { this: Suite =>

implicit protected val system: ActorSystem = ActorSystem()
implicit protected val mat: Materializer = ActorMaterializer()
implicit protected val snsClient: AmazonSNSAsyncClient = mock[AmazonSNSAsyncClient]

override protected def beforeEach(): Unit =
reset(snsClient)

override protected def afterAll(): Unit =
Await.ready(system.terminate(), 5.seconds)

}
Loading