Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check existing credits on the line before adding new credits #26637

Closed
ramya-rao-a opened this issue Jan 22, 2022 · 4 comments
Closed

Check existing credits on the line before adding new credits #26637

ramya-rao-a opened this issue Jan 22, 2022 · 4 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. feature-request This issue requires a new behavior in the product in order be resolved. Service Bus
Milestone

Comments

@ramya-rao-a
Copy link
Contributor

ramya-rao-a commented Jan 22, 2022

Say there is a call to ServiceBusReceiverClient.receiveMessages() asking for 100 messages

  • client adds 100 credits on the line, but happens to get only 10 messages within the timeout and therefore returns the 10 to the user.
  • before the next call to receiveMessages(), 30 messages are received by the client and they live in the buffer.

Today, the next call to receiveMessages() for another 100 messages will result in 70 credits being added to the line ignoring the fact that the line already has 60 credits from the first call. Now there are 130 credits on the line resulting in the extra messages being sent to the buffer where they can potentially get expired before the next receive call

This issue is logged after a discussion with @shankarsama to investigate if we can check the number of remaining credits on the line and adjust the new credits to add appropriately

Ideal behavior:

  • Do not serve expired messages to the user #26632 will ensure that the 30 messages in the buffer are released immediately as there is no active receive call.
  • The next call to receiveMessages() asking for 100 messages should result in adding only 40 credits as there are 60 already on the line. To check the remaining credits on the line, we can try to use getRemoteCredit()

cc @conniey, @anuchandy, @JoshLove-msft, @shankarsama, @yvgopal

@ghost ghost added the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jan 22, 2022
@ramya-rao-a ramya-rao-a added Client This issue points to a problem in the data-plane of the library. Service Bus labels Jan 22, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jan 22, 2022
@ramya-rao-a ramya-rao-a added the feature-request This issue requires a new behavior in the product in order be resolved. label Jan 22, 2022
@ramya-rao-a ramya-rao-a added this to the [2022] March milestone Jan 22, 2022
@ramya-rao-a ramya-rao-a assigned ZejiaJiang and unassigned anuchandy Feb 22, 2022
@ZejiaJiang
Copy link
Member

After investigation, service bus synchronous client will check the credits on the link by using checkAndAddCredits.

Here is some code snippet:

private void checkAndAddCredits(AmqpReceiveLink link) {
if (link == null) {
return;
}
synchronized (lock) {
final int linkCredits = link.getCredits();
final int credits = getCreditsToAdd(linkCredits);
if (credits > 0) {
link.addCredits(credits).subscribe();
}
}
}
private int getCreditsToAdd(int linkCredits) {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long r = REQUESTED.get(this);
final boolean hasBackpressure = r != Long.MAX_VALUE;
if (subscriber == null || r == 0) {
logger.info("Not adding credits. No downstream subscribers or items requested.");
return 0;
}
final int creditsToAdd;
final int expectedTotalCredit;
if (prefetch == 0) {
if (r <= Integer.MAX_VALUE) {
expectedTotalCredit = (int) r;
} else {
//This won't really happen in reality.
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
//So it will request one by one from this link processor, even though the user's request has no
//back pressure.
//For sync client, the sync subscriber has back pressure.
//The request count uses the argument of method receiveMessages(int maxMessages).
//It's at most Integer.MAX_VALUE.
expectedTotalCredit = Integer.MAX_VALUE;
}
} else {
expectedTotalCredit = prefetch;
}
synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;
if (hasBackpressure) {
creditsToAdd = Math.max(expectedTotalCredit - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(expectedTotalCredit - pending, 0)
: 0;
}
logger.atInfo()
.addKeyValue("prefetch", getPrefetch())
.addKeyValue(NUMBER_OF_REQUESTED_MESSAGES_KEY, r)
.addKeyValue("linkCredits", linkCredits)
.addKeyValue("expectedTotalCredit", expectedTotalCredit)
.addKeyValue("queuedMessages", queuedMessages)
.addKeyValue("creditsToAdd", creditsToAdd)
.addKeyValue("messageQueueSize", messageQueue.size())
.log("Adding credits.");
}
return creditsToAdd;
}

@sohanlal311
Copy link

@ZejiaJiang Could you please let me know which version of the library it has been fixed in?

@ZejiaJiang
Copy link
Member

@ZejiaJiang Could you please let me know which version of the library it has been fixed in?

Hi Sohan, the codes shows that the latest fix is in v7.5.1. And we recommand you use the latest version which is v7.7.0.

@sohanlal311
Copy link

Thanks @ZejiaJiang. We will upgrade to the latest version.

@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. feature-request This issue requires a new behavior in the product in order be resolved. Service Bus
Projects
None yet
Development

No branches or pull requests

4 participants