Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Allow 0 prefetch and dynamically use batch size to request link credits #17546

Merged
merged 33 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fd1843a
Link credit size adjustment
YijunXieMS Nov 12, 2020
deadef5
Add API to block and flow link credit to the service.
YijunXieMS Nov 13, 2020
945f1c8
Use addCreditsBlocking instead of addCredits to avoid too many credit…
YijunXieMS Nov 13, 2020
98d07dd
Request fewer credits in the 2nd request if the 1st request returns f…
YijunXieMS Nov 13, 2020
ef52f71
Rename addCreditsBlocking to addCreditsInstantly
YijunXieMS Nov 13, 2020
36c4302
Update amqp-core dependency version
YijunXieMS Nov 13, 2020
1a5c4c4
Use remaining instead of bufferMessages.size() to calculate num of re…
YijunXieMS Nov 13, 2020
d43457e
Add backpressure in request for async receiveMessages()
YijunXieMS Nov 16, 2020
34c70ce
Use limitRate instead of autoConnect for back pressure.
YijunXieMS Nov 16, 2020
e7aa8bc
rename receiveMessagesNoConnection to receiveMessagesNoBackPressure
YijunXieMS Nov 16, 2020
152e193
Add back pressure, adjust link credits using prefetch and request siz…
YijunXieMS Nov 17, 2020
65cdcf6
Fix unit test
YijunXieMS Nov 18, 2020
c4a14f0
Merge branch 'master' into sb_receiver_tuning
YijunXieMS Nov 18, 2020
41d08ab
Small change (add final to variable)
YijunXieMS Nov 18, 2020
124baab
Add unreleased core-amqp
YijunXieMS Nov 18, 2020
5743dcb
SessionReceiver uses reactor prefetch 1 instead of default 256
YijunXieMS Nov 18, 2020
8986b2f
Dispose SynchronousMessageSubscriber before closing async client in s…
YijunXieMS Nov 18, 2020
39bac24
Prefetch 1 in instead fo default 256
YijunXieMS Nov 19, 2020
5572e15
Add some code comments
YijunXieMS Nov 19, 2020
530e258
set low tide 0 in limitRate() to disable replenish
YijunXieMS Nov 19, 2020
aee463c
use limitRate in async client and remove the limit in processor
YijunXieMS Nov 19, 2020
bafc0e4
autoConnect in receiveMessages() so it can be subscribed multiple times.
YijunXieMS Nov 19, 2020
037cbbc
Enable subscribe twice test.
YijunXieMS Nov 19, 2020
3774d66
Merge branch 'master' into sb_receiver_tuning
YijunXieMS Nov 19, 2020
09fca43
Use publish / autoConnect to support multiple subscribers.
YijunXieMS Nov 20, 2020
fe7ee04
put addCreditInstantly in synchronized block.
YijunXieMS Nov 20, 2020
59aee7b
Format change for checkstyle
YijunXieMS Nov 20, 2020
8ea5ba6
Use addCredits instead of addCreditsInstantly.
YijunXieMS Nov 20, 2020
24e3421
Use addCredits instead of addCreditsInstantly
YijunXieMS Nov 20, 2020
de311a8
Remove autoConnect
YijunXieMS Nov 20, 2020
29085d3
Remove test case that subscribe receiveMessages() twice.
YijunXieMS Nov 20, 2020
4c837b5
Use addCredits instead of addCreditsInstantly (update test)
YijunXieMS Nov 20, 2020
3c23542
Checkstyle
YijunXieMS Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ private synchronized void receiveMessages() {
}
ServiceBusReceiverAsyncClient receiverClient = asyncClient.get();
receiverClient.receiveMessagesWithContext()
.parallel(processorOptions.getMaxConcurrentCalls())
.runOn(Schedulers.boundedElastic())
.parallel(processorOptions.getMaxConcurrentCalls(), 1)
.runOn(Schedulers.boundedElastic(), 1)
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
.subscribe(new Subscriber<ServiceBusMessageContext>() {
@Override
public void onSubscribe(Subscription subscription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
// to auto-refill the prefetch buffer. A request will retrieve one message from this buffer.
// If receiverOptions.prefetchCount is 0 (default value),
// the request will add a link credit so one message is retrieved from the service.
return receiveMessagesNoBackPressure().limitRate(1);
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,13 @@ private int getCreditsToAdd(int linkCredits) {
if (r <= Integer.MAX_VALUE) {
expectedTotalCredit = (int) r;
} else {
//This won't really happen in reality.
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
//So it will request one by one from this link processor, even though the user's request has no
//back pressure.
//For sync client, the sync subscriber has back pressure.
//The request count uses the the argument of method receiveMessages(int maxMessages).
//It's at most Integer.MAX_VALUE.
expectedTotalCredit = Integer.MAX_VALUE;
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
Expand Down