-
Notifications
You must be signed in to change notification settings - Fork 643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQS flows + Embedded ElasticMQ #239
Conversation
@s12v Could you take a look at this PR? |
new AsyncHandler[DeleteMessageRequest, DeleteMessageResult] { | ||
|
||
override def onError(exception: Exception): Unit = { | ||
r.failure(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's failure
here and tryFailure
in the other handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be failure
in both, fixing.
|
||
override def onError(exception: Exception): Unit = { | ||
r.failure(exception) | ||
onComplete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it's better to not directly call methods in GraphStageLogic
, and always invoke callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, I'll fix this to use callbacks.
case RequeueWithDelay(delaySeconds) => | ||
sqsClient | ||
.sendMessageAsync(new SendMessageRequest(queueUrl, message.getBody).withDelaySeconds(delaySeconds), | ||
new AsyncHandler[SendMessageRequest, SendMessageResult] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this object always does exactly the same, would it be better to create it once and reuse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't, it closes over the promise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some feedback.
|
||
val logic = new GraphStageLogic(shape) with StageLogging { | ||
private var inFlight = new AtomicInteger(0) | ||
@volatile private var inIsClosed = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put all interaction in methods called from async callbacks instead of these thread primitives and make use of the guarantees streams gives you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, fixing to callbacks.
import scala.concurrent.{Future, Promise} | ||
import scala.util.{Failure, Success, Try} | ||
|
||
class SqsAckFlowStage(queueUrl: String, sqsClient: AmazonSQSAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
and maybe private[sqs]
as well. The factories are the public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, fixing.
|
||
setHandler(out, | ||
new OutHandler { | ||
override def onPull() = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange indentation/linebreaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's automatically managed by scalafmt
|
||
def onComplete(): Unit = | ||
if (inFlight.decrementAndGet() == 0 && inIsClosed) | ||
checkForCompletionCB.invoke(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exact duplicate of the other onComplete
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, removing.
case RequeueWithDelay(delaySeconds) => | ||
sqsClient | ||
.sendMessageAsync(new SendMessageRequest(queueUrl, message.getBody).withDelaySeconds(delaySeconds), | ||
new AsyncHandler[SendMessageRequest, SendMessageResult] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't, it closes over the promise.
checkForCompletionCB.invoke(()) | ||
|
||
val (message, action) = grab(in) | ||
val r = Promise[AmazonWebServiceResult[ResponseMetadata]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
responsePromise
or something like that instead of r
. Single char names are ok in a small/few line scope, not in a larger scope like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, renaming.
|
||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { | ||
|
||
val logic = new GraphStageLogic(shape) with StageLogging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to capture it in the logic
val, just return it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixing.
*/ | ||
def create(queueUrl: String, | ||
settings: SqsAckSinkSettings, | ||
sqsClient: AmazonSQSAsync): Sink[MessageActionPair, Future[Done]] = | ||
Sink.fromGraph(new SqsAckSinkStage(queueUrl, settings, sqsClient)) | ||
sqsClient: AmazonSQSAsync): Sink[MessageActionPair, CompletionStage[Done]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
package object javadsl { | ||
def identityFunction[T] = new Function[T, T] { | ||
override def apply(param: T): T = param | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't used anywhere so can be removed? (If it was used a val
would be better, to avoid the allocation of a new function on every use)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll remove it.
private InetSocketAddress sqsAddress; | ||
private int sqsPort = 0; | ||
protected String sqsEndpoint; | ||
protected AWSCredentialsProvider credentialsProvider = new AWSCredentialsProvider() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly there should be a StaticAWSCredentialsProvider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll use it.
64332c4
to
9c55423
Compare
- Mainly managing local state via callbacks + some minor changes - Additionally, use dedicated result types to represent data returned from flow (AWS response metadata + original msg body)
9c55423
to
ed7ad2d
Compare
I had to rebase this PR against master, there were merge conflicts. All review remarks are addressed, let me know if I overlooked something. Thanks a lot for reviewing @dpfeiffer @johanandren @s12v |
LGTM, thanks @kciesielski ! |
This PR introduces quit a lot of changes to the SQS connector, however it keeps the public Scala API backward compatible. Highlights:
SqsFlow.create()
andSqsAckFlow.create()
for creating SQS publishers as flows.SqsSinkStage
andSqsAckSinkStage
as these stages can be build from flows.CompletableStage
instead of ScalaFuture
.The design of flow stages is based on similar stages in akka-stream-kafka