Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Reject batch receive when connection fails #6338

Closed
wants to merge 1 commit into from

Conversation

ramya-rao-a
Copy link
Contributor

This PR updates the onDetached method to reject the promise returned by the ongoing receive operation to fix bug #6065

When there is a connection level error, no error events are raised on the AMQP link or session level.
Therefore, an ongoing batch receive request is unaware of such errors.

Previously, there was code put in place to hold such errors in the detachedError variable which was then checked to decide whether the promise should be rejected or not. This worked as long as the connection error happened before the finalAction() on the batch receiver was called.

In #6065, what is happening is that the connection is lost and credits need to be drained. We never get the drain completed event and therefore the promise never gets resolved.

@ramya0820
Copy link
Member

ramya0820 commented Nov 26, 2019

A little background:
One of the things that came up while looking at #5757 and #6270 and which we talked about offline - was the corner case where the addCredit(1) as part of drain was resulting in loss of 1 message consistently.
The work around for addressing that issue was by updating finalAction() to wait for one last time to receive the drain event. And at end of it, resolve the promise with messages collected so far.
This should fix the codepath in finalAction() https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/servicebus/service-bus/src/core/batchingReceiver.ts#L147 that doesn't end with a promise resolution.

Proposal:
At line above we could initiate a final timeout for draining and then remove listeners and resolve the promise. This should help with making sure the draining is handled and the promise gets resolved as we do not indefinitely wait for drain event.

cc: @chradek

* @param {AmqpError | Error} [receiverError] The receiver error if any.
* @returns {Promise<void>} Promise<void>.
*/
async onDetached(receiverError?: AmqpError | Error): Promise<void> {
// Clears the token renewal timer. Closes the link and its session if they are open.
if (this._newMessageReceivedTimer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that the following could occur:

  1. User calls receive and receives events.
  2. User calls receive again.
  3. onDetached from 1st receive call is fired. Clears timeouts and rejects promise for 2nd receive call (due to using instance references).
  4. 2nd receive call erroneously rejects.

My intuition is that if onDetached is called, it's going to impact all the receive calls that are in flight (do we allow calling receive concurrently? I'm assuming we don't since we have isReceivingMessages) so the behaviour would be fine, but I wanted to make sure it was thought about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiveMessages cannot be called concurrently, so there will be only 1 receive operation on the batching receiver in progress at any time.

I worry that there are a few more corner cases like these and so wanted to open a conversation via the draft PR.

My initial instinct was to use an indicator on the connection of whether it is open or not and use that to reject the promise from inside of the finalAction()

But, unfortunately, rhea's isOpen() on link, session and connection level all return true in this case. More investigation is needed

@ramya-rao-a
Copy link
Contributor Author

@ramya0820 I want to consider adding another timer in the finalAction() as a last resort if we don't find a better way to detect when there is an error at AMQP connection level

@ramya-rao-a
Copy link
Contributor Author

Closing in favor of #6601

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants