-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Flow to support RabbitMQ RPC workflow #160 #161
Conversation
Is this failure related to my PR? Or something else? It seems unrelated |
the failure is from this PR, there is a missing |
9024e8c
to
b6feb63
Compare
Any comments on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Added some stuff to take another look at.
|
||
val input = Vector("one", "two", "three", "four", "five") | ||
//#run-rpc-flow | ||
val (rpcQueueF, probe) = Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linebreak all long lines like this in the samples into smaller chunks so it is easier to read.
/** | ||
* Scala API: Creates an [[AmqpRpcFlow]] that accepts ByteString elements and emits ByteString elements. | ||
*/ | ||
def simple(settings: AmqpSinkSettings, repliesPerMessage: Int = 1) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate the docs from the AmqpRpcFlowStage
onto both these methods (and the Java counterparts) as this is pretty much the API entry point, few users will actually construct the stage itself.
@@ -17,6 +17,12 @@ object AmqpSink { | |||
Sink.fromGraph(new AmqpSinkStage(settings)) | |||
|
|||
/** | |||
* Java API: Creates an [[AmqpSink]] that accepts [[OutgoingMessage]] elements. | |||
*/ | |||
def createReplyTo(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate the docs from AmqpReplyToStage
here to make it clear how it differs from create
.
Also, maybe we should fail the "regular" sink if replyTo
is set on an OutgoingMessage
, so that if someone mixes them up it will at least fail as fast as possible? (Ofc optimal would be to have two different types so doing it wrong wouldn't even compile, but to do that we'd have to break the existing method and I don't think we should do that)
/** | ||
* Java API: Creates an [[AmqpRpcFlow]] with given settings and buffer size. | ||
*/ | ||
def create(settings: AmqpSinkSettings, bufferSize: Int) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always use explicit return types on the public methods, all across the PR. So these should be javadsl.Flow[...]
Also, the scaladoc/javadoc here isn't very useful, try to explain what the flow it creates does rather than "it takes these parameters and returns a flow of this type" - that is already pretty much clear from the signature.
Something like "Create a RPC style flow (... link to rpc sample in rabbitmq docs...) allowing for bla bla communication over the message bus".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I've always been awful at writing docs. I'll give it a shot though.
|
||
// TODO: This is pretty expensive just to see if a message specifies how many responses it should be expecting | ||
Option(props.getHeaders) | ||
.map(_.asScala) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird indentation here
new InHandler { | ||
// We don't want to finish since we're still waiting | ||
// on incoming messages from rabbit | ||
override def onUpstreamFinish(): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if upstream cancels before emitting any messages, will we then be stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Any thoughts on how this should be handled? Make sure at least one element has been emitted?
val elem = grab(in) | ||
channel.basicPublish( | ||
"", | ||
elem.props.map(_.getReplyTo).get, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if no replyTo
is set? Should it discard such messages and log a warning or maybe fail the stage even, or maybe fallback to the destination defined in the settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to opinions. I think the best would be to remove the ability to configure a destination in the settings. I suppose it could be an option for failing the stage or discarding the element?
object AmqpRpcFlowStage { | ||
|
||
/** | ||
* Internal API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to mark it Internal when it is private
as it will not be accessible anyways. We use it when something is private[alpakka]
for example, since that does not have a Java access control counterpart and will be accessible from Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. I was just copying existing code
docs/src/main/paradox/amqp.md
Outdated
@@ -115,6 +115,17 @@ We merge all sources into one and add the index of the source to all incoming me | |||
|
|||
Such sink and source can be started the same way as in the previous example. | |||
|
|||
### Using rabbitmq as an RPC mechanism | |||
|
|||
If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow [RabbitMQ RPC](https://www.rabbitmq.com/tutorials/tutorial-six-python.html) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to the Java one instead? ;) https://www.rabbitmq.com/tutorials/tutorial-six-java.html
docs/src/main/paradox/amqp.md
Outdated
|
||
|
||
Scala | ||
: @@snip (../../../../amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala) { #run-rpc-flow } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include Java samples as well?
Seems like something wrong with the doc changes, you can run them locally using |
I don't think that failure was from me |
I think it was a timing issue on the ci-server, restarted the failed job. |
Any comments on this? |
elem.bytes.toArray | ||
) | ||
|
||
// TODO: This is pretty expensive just to see if a message specifies how many responses it should be expecting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, try to rewrite with something more efficient (less scala idiomatic)
} | ||
} | ||
) | ||
}, promise.future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure it's completed somehow for all cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to know for sure? https://gitter.im/akka/dev?at=5897b6246018ccd652745a04
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postStop
but then you won't know if the stage failed or completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a good idea to put a guard in there (postStop
) try-failing the promise with an exception and "stage stopped unexpectedly" so that it does not happen silently.
*/ | ||
def create(settings: AmqpSinkSettings, | ||
bufferSize: Int, | ||
repliesPerMessage: Int): javadsl.Flow[OutgoingMessage, IncomingMessage, Future[String]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadsl should use CompletionStage
instead of Future
66c45fb
to
5cb1bf8
Compare
I don't think this failure was because of me |
Yeah, seems like a timing issue in other tests |
Can this be retested and accepted/commented on? |
PLS BUILD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrote a bit more, not a complete review though.
promise.tryFailure(ex) | ||
failStage(ex) | ||
case None => | ||
promise.trySuccess("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it wasn't completed with a name already, shouldn't this be a failure as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's right. I guess it depends. the case None is when a queue gets deleted or something like that. I suppose that is technically a failure, huh.
// haven't processed a message yet, we do want to complete | ||
// so that we don't hang. | ||
override def onUpstreamFinish(): Unit = | ||
if (outstandingMessages == 0) super.onUpstreamFinish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check if queue is empty as well? In case we got responses but downstream was backpressuring.
Ping on this please? @johanandren |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments added!
|
||
override def postStop(): Unit = { | ||
if (!promise.isCompleted) { | ||
promise.failure(new RuntimeException("stage stopped unexpectedly")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a single line with promise.tryFailure
which will only fail the promise if it isn't already completed or failed - so no need for the if
block.
with AmqpConnector { stage => | ||
import AmqpReplyToStage._ | ||
|
||
val in = Inlet[OutgoingMessage]("AmqpReplyToSink.in") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong name in string
}, promise.future) | ||
} | ||
|
||
override def toString: String = "AmqpReplyToSink" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong toString
* Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating | ||
* over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. | ||
* | ||
* This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... to a CompletionStage<String>, ...
* Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating | ||
* over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. | ||
* | ||
* This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletionStage
here as well
object AmqpReplyToStage { | ||
|
||
private val defaultAttributes = | ||
Attributes.name("AmqpReplyToSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the right name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thie name of the stage is AmqpReplyToStage
, no? AmqpReplyToSink
is an old name or copy-pasta, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I meant for the object to be AmqpReplyToSinkStage. Basically the existing AmqpSinkStage, but it's a ReplyTo sink.
/** | ||
* Java API | ||
*/ | ||
def create(connectionSettings: AmqpConnectionSettings) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always use explicit return types on public methods
*/ | ||
def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, NotUsed] = | ||
def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, Future[Done]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document what the matval is for all three factories
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just following what's already in the file. Should i explicitly type everything in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just document in what circumstances that Future[Done]
is completed/failed in the scaladoc.
|
||
//#create-rpc-flow | ||
final Flow<ByteString,ByteString, CompletionStage<String>> ampqRpcFlow = AmqpRpcFlow.createSimple( | ||
AmqpSinkSettings.create(DefaultAmqpConnection.getInstance()).withRoutingKey(queueName).withDeclarations(queueDeclaration), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe provide a AmqpSinkSettings.create()
which uses the default? Would make this a little bit more neat.
pushAndAckMessage(message) | ||
} else { | ||
if (queue.size + 1 > bufferSize) { | ||
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, shouldn't we rather back pressure if there is no room in the buffer for the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took this from the AmqpSourceStage that currently exists. I believe the idea is that the else should never happen. This would be a situation where for some reason we have more responses coming in from rabbit than we asked for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I think you are right. We shouldn't have asked for an element if the buffer didn't have space for it it or out was available.
Needs a rebase now, not sure what got merged that changed |
And a compile after the rebase to update formatting, which is causing the build to fail now. Almost there! |
Changes Amqp sinks to materialize to Future[Done]. As currently it was very difficult to determine when/if a sink failed due to a amqp error.
Any other comments on this? |
All green. Great job! |
No description provided.