Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SNS connector with publish sink #204 #205

Merged
merged 13 commits into from
Apr 13, 2017

Conversation

fgrutsch
Copy link
Contributor

  • Includes a new connector called sns
  • Includes a sink to publish messages to an AWS SNS topic

@johanandren
Copy link
Member

REFS #204

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments added. Looks promising!

build.sbt Outdated
@@ -77,6 +77,14 @@ lazy val simpleCodecs = project
name := "akka-stream-alpakka-simple-codecs"
)

lazy val sns = project
.in(file("sns"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the folder is the same name as the project/module, you don't have to specify .in(file(...)) it should be magically correct just by calling project (see module above for example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I will try it out.

@@ -0,0 +1,77 @@
# AWS SNS Connector

The AWS SNS connector provides an Akka Stream sink for AWS SNS.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good with just a tad more about what SNS is, "provides an Akka Stream sink for push notifications through AWS SNS", so that a reader who doesn't know doesn't have to go to the official documentation directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will add some more information.


import scala.concurrent.{Future, Promise}

final class SnsPublishSinkStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the AmazonSNSAsync thread safe? A stage can be materialized any number of times and all of them will share the same snsClient. If it cannot be shared a factory lambda is better (() => AmazonSNSAsync), that way every materialization can have its own client (which it will be responsible to manage/close etc in sync with the stage lifecycle).

Another option might be to take a AmazonSNSAsyncClientBuilder and .build() that each materialization (if the builder is immutable that is).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional thought: It could be better to make this a flow, emitting the values downstream as they have been ack:ed from AWS, the factory for the simple use case as sink could then be achieved through composition (SNSPublishFlow.to(Sink.ignore)).

Copy link
Contributor Author

@fgrutsch fgrutsch Mar 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the AmazonSNSAsync is ThreadSafe regarding to the AWS docs. That's why I think it is fine to share a single AmazonSNSAsync among multiple materialization stages.

Also regarding your last comment about not making the AmazonSNSAsync implicit, I try to be consistent with other connectors, like the SQS one. There we also pass the AmazonSQSClient implicitly. I think it will be easier for people who use the connectors if the initialization of the GraphStages work the same way.

Good point to change the Sink to a Flow. I thought about keeping the Sink and provide an additional Flow GraphStage which will return the messageId of the published message, so clients can do further processing with the returned messageId. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Hadn't noticed that how many connectors that do that. Thats a valid argument to keep it implicit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for verifying that the client is threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe even just let it emit PublishResult downstream and then both String => String, and String => MessageId can be achieved through combinations with other stages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the SnsPublishFlow to emit a PublishResult.

Copy link
Member

@johanandren johanandren Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we could delete the sink stage and express it by using the flow in the factory method instead:

val flow: Flow[String, PublishResult, NotUsed] 
val sink: Sink[String, Future[Done]] = flow.toMat(Sink.ignore)(Keep.right)

}

def handleFailure(ex: Throwable): Unit = {
log.error(ex, "AmazonSNSAsync failure: {}", ex.getMessage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log the failure since we fail the stage and the materialized value with it. You could wrap it with a en exception to decorate with extra information from the stage though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I will remove it then.

publishRequest,
new PublishResult().withMessageId("message-id")
)
new CompletableFuture()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This future is never completed though, shouldn't that cause the matVal to never complete, is there a bug around waiting for ack on publishing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will further investigate it.

@fgrutsch
Copy link
Contributor Author

@johanandren I updated the PR regarding your comments. Furthemrore, I also added a SnsPublishFlow what I mentioned in one of my replies of your comments.

val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in))

snsClient.publishAsync(request,
new AsyncHandler[PublishRequest, PublishResult] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily anything to act on but, as this gets the values passed in on success/failure, it could be a single instance of AsyncHandler that you reuse rather than a new instance for every message.

}

override def onPull(): Unit = {
if (isClosed(in)) completeStage()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be an ongoing publish that did not yet complete here, even if in was closed, you need to keep track of that and emit that last publish result when it comes, and then complete the stage.


import scala.compat.java8.FutureConverters._

object SnsPublishSink {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should then instead be SnsPublisher and have both .sink and .flow in the same place?

@fgrutsch
Copy link
Contributor Author

@johanandren Updated regarding your comments. Removed the SnsPublishSink and provide a SnsPublisher with .flow and .sink factory methods. Also implemented a way to keep track of in flight messages for proper stage closing.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @fg-devs !

@patriknw patriknw self-assigned this Mar 20, 2017
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, but some questions around the push/pull logic

log.debug("Published SNS message: {}", result.getMessageId)
inFlight -= 1
if (isAvailable(out)) {
if (!hasBeenPulled(in)) tryPull(in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pull here? shouldn't that only be done from onPull?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this limit the Flow to process elements only one by one? Otherwise we would always have to wait for the output port to pull?

Thats why I also made inFlight as an Int and not a Boolean.

private def handleSuccess(result: PublishResult): Unit = {
log.debug("Published SNS message: {}", result.getMessageId)
inFlight -= 1
if (isAvailable(out)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will it not be available? should this be !isClosed
we are just dropping the message if it's not available so I want to be sure that we don't do that accidentally

}

override def onPush(): Unit = {
inFlight += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can inFlight be > 1? perhaps use boolean if it can only be 0 or 1


override def onPull(): Unit = {
if (isClosed(in) && inFlight == 0) completeStage()
if (!hasBeenPulled(in)) tryPull(in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else if

import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal API?
private[akka]

object SnsPublisher {

/**
* Java API: creates a [[Flow]] from a [[SnsPublishFlowStage]] for a SNS topic using an [[AmazonSNSAsync]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users should not care about the implementation SnsPublishFlowStage

@ktoso
Copy link
Member

ktoso commented Apr 7, 2017

Hi there @fg-devs, are you going to have some time to address the small comments made?
I'd hope so, would be awesome to include this in the next release :)

@fgrutsch
Copy link
Contributor Author

fgrutsch commented Apr 8, 2017

@ktoso Hey, yes I will do the changes today.

Fabian Grutsch added 2 commits April 8, 2017 18:19
# Conflicts:
#	build.sbt
#	docs/src/main/paradox/connectors.md
@fgrutsch
Copy link
Contributor Author

fgrutsch commented Apr 8, 2017

@ktoso @patriknw Update the PR regarding your comments. The build for Scala 2.12 failed (there is already an open ticket)

# Conflicts:
#	build.sbt
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}

private[akka] final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the client have a lifecycle, is is thread safe, should this be a client factory instead (() => AmazonSNSAsync) and have its lifecycle managed by the stage (`client.close() when the stage stops)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I already asked that previously but github hid all comments and I had forgot.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@johanandren johanandren merged commit d2b0363 into akka:master Apr 13, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants