Skip to content

Commit

Permalink
Updates prefetch algorithm and test updates. (#12975)
Browse files Browse the repository at this point in the history
* Remove parentConnection from link processor.

* Add comment for lock tokens on received messages.

* Using prefetch buffer.

* Add logic for emptyCreditListener when empty.

* Pass prefetch into parameter when publishing.

* Consider ReceiveMode when using prefetch so we continue to fetch.
  • Loading branch information
conniey authored Jul 10, 2020
1 parent 734b8cd commit eadf80d
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
private final Flux<ServiceBusReceivedMessage> processor;

ServiceBusAsyncConsumer(String linkName, ServiceBusReceiveLinkProcessor linkProcessor,
MessageSerializer messageSerializer) {
MessageSerializer messageSerializer, int prefetch) {
this.linkName = linkName;
this.linkProcessor = linkProcessor;
this.messageSerializer = messageSerializer;
this.processor = linkProcessor
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
.publish()
.publish(prefetch)
.autoConnect(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
Expand Down Expand Up @@ -1255,14 +1254,12 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
})
.repeat();

final LinkErrorContext context = new LinkErrorContext(fullyQualifiedNamespace, entityPath, linkName,
null);
final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(connectionProcessor.getRetryOptions());
final ServiceBusReceiveLinkProcessor linkMessageProcessor = receiveLink.subscribeWith(
new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy, connectionProcessor,
context));
new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy,
receiverOptions.getReceiveMode()));
final ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor,
messageSerializer);
messageSerializer, receiverOptions.getPrefetchCount());

// There could have been multiple threads trying to create this async consumer when the result was null.
// If another one had set the value while we were creating this resource, dispose of newConsumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ class UnnamedSessionReceiver implements AutoCloseable {
final ServiceBusReceivedMessage deserialized = messageSerializer.deserialize(message,
ServiceBusReceivedMessage.class);

//TODO (conniey): For session receivers, do they have a message lock token?
if (!CoreUtils.isNullOrEmpty(deserialized.getLockToken())) {
lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil());
} else {
logger.info("sessionId[{}] message[{}]. There is no lock token.",
deserialized.getSessionId(), deserialized.getMessageId());
}

return new ServiceBusReceivedMessageContext(deserialized);
Expand All @@ -101,9 +105,12 @@ class UnnamedSessionReceiver implements AutoCloseable {
return;
}

final String token = CoreUtils.isNullOrEmpty(context.getMessage().getLockToken())
? context.getMessage().getLockToken()
final ServiceBusReceivedMessage message = context.getMessage();
final String token = !CoreUtils.isNullOrEmpty(message.getLockToken())
? message.getLockToken()
: "";

logger.verbose("Received sessionId[{}] messageId[{}]", context.getSessionId(), message.getMessageId());
messageReceivedSink.next(token);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ReceiveMode;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -42,20 +42,25 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
implements Subscription {
private final ClientLogger logger = new ClientLogger(ServiceBusReceiveLinkProcessor.class);
private final Object lock = new Object();
private final Object queueLock = new Object();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean hasFirstLink = new AtomicBoolean();
private final AtomicBoolean linkCreditsAdded = new AtomicBoolean();
private final AtomicReference<String> linkName = new AtomicReference<>();

// Queue containing all the prefetched messages.
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
// size() on Deque is O(n) operation, so we use an integer to keep track. All reads and writes to this are gated by
// the `queueLock`.
private final AtomicInteger pendingMessages = new AtomicInteger();
private final int minimumNumberOfMessages;
private final int prefetch;

private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
private final AtomicInteger wip = new AtomicInteger();

private final int prefetch;
private final AmqpRetryPolicy retryPolicy;
private final Disposable parentConnection;
private final AmqpErrorContext errorContext;
private final ReceiveMode receiveMode;

private volatile Throwable lastError;
private volatile boolean isCancelled;
Expand All @@ -78,39 +83,29 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
*
* @param prefetch The number if messages to initially fetch.
* @param retryPolicy Retry policy to apply when fetching a new AMQP channel.
* @param parentConnection Represents the parent connection.
*
* @throws NullPointerException if {@code retryPolicy} is null.
* @throws IllegalArgumentException if {@code prefetch} is less than 0.
*/
public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy,
Disposable parentConnection, AmqpErrorContext errorContext) {
public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy, ReceiveMode receiveMode) {
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
this.parentConnection = Objects.requireNonNull(parentConnection, "'parentConnection' cannot be null.");
this.errorContext = errorContext;
this.receiveMode = Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");

if (prefetch <= 0) {
if (prefetch < 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'prefetch' cannot be less than or equal to 0."));
new IllegalArgumentException("'prefetch' cannot be less than 0."));
}

this.prefetch = prefetch;

// When the queue has this number of messages left, it's time to add more credits to refill the prefetch queue.
this.minimumNumberOfMessages = Math.floorDiv(prefetch, 3);
}

public String getLinkName() {
return linkName.get();
}

/**
* Gets the error context associated with this link.
*
* @return the error context associated with this link.
*/
public AmqpErrorContext getErrorContext() {
return errorContext;
}


public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
if (isDisposed()) {
return monoError(logger, new IllegalStateException(String.format(
Expand All @@ -123,7 +118,15 @@ public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryStat
"lockToken[%s]. state[%s]. Cannot update disposition with no link.", lockToken, deliveryState)));
}

return link.updateDisposition(lockToken, deliveryState);
return link.updateDisposition(lockToken, deliveryState)
.then(Mono.fromRunnable(() -> {
// Check if we should add more credits.
synchronized (queueLock) {
pendingMessages.decrementAndGet();
}

checkAndAddCredits(link);
}));
}

/**
Expand Down Expand Up @@ -196,17 +199,20 @@ public void onNext(ServiceBusReceiveLink next) {
oldSubscription = currentLinkSubscriptions;

currentLink = next;
next.setEmptyCreditListener(() -> {
final int creditsToAdd = getCreditsToAdd(0);
linkCreditsAdded.set(creditsToAdd > 0);

if (!hasFirstLink.getAndSet(true)) {
linkCreditsAdded.set(true);
next.addCredits(prefetch);
}

next.setEmptyCreditListener(() -> getCreditsToAdd());
return creditsToAdd;
});

currentLinkSubscriptions = Disposables.composite(
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
messageQueue.add(message);
synchronized (queueLock) {
messageQueue.add(message);
pendingMessages.incrementAndGet();
}

drain();
}),
next.getEndpointStates().subscribe(
Expand All @@ -221,9 +227,11 @@ public void onNext(ServiceBusReceiveLink next) {
onError(error);
},
() -> {
if (parentConnection.isDisposed() || isTerminated()
|| upstream == Operators.cancelledSubscription()) {
logger.info("Terminal state reached. Disposing of link processor.");
if (isTerminated()) {
logger.info("Processor is terminated. Disposing of link processor.");
dispose();
} else if (upstream == Operators.cancelledSubscription()) {
logger.info("Upstream has completed. Disposing of link processor.");
dispose();
} else {
logger.info("Receive link endpoint states are closed. Requesting another.");
Expand All @@ -239,6 +247,8 @@ public void onNext(ServiceBusReceiveLink next) {
}));
}

checkAndAddCredits(next);

if (oldChannel != null) {
oldChannel.dispose();
}
Expand Down Expand Up @@ -313,7 +323,7 @@ public void onError(Throwable throwable) {
final String linkName = link != null ? link.getLinkName() : "n/a";
final String entityPath = link != null ? link.getEntityPath() : "n/a";

if (retryInterval != null && !parentConnection.isDisposed() && upstream != Operators.cancelledSubscription()) {
if (retryInterval != null && upstream != Operators.cancelledSubscription()) {
logger.warning("linkName[{}] entityPath[{}]. Transient error occurred. Attempt: {}. Retrying after {} ms.",
linkName, entityPath, attempt, retryInterval.toMillis(), throwable);

Expand All @@ -322,10 +332,6 @@ public void onError(Throwable throwable) {
return;
}

if (parentConnection.isDisposed()) {
logger.info("Parent connection is disposed. Not reopening on error.");
}

logger.warning("linkName[{}] entityPath[{}]. Non-retryable error occurred in AMQP receive link.",
linkName, entityPath, throwable);
lastError = throwable;
Expand Down Expand Up @@ -371,12 +377,11 @@ public void request(long request) {
Operators.addCap(REQUESTED, this, request);

final AmqpReceiveLink link = currentLink;
if (link != null && !linkCreditsAdded.getAndSet(true)) {
int credits = getCreditsToAdd();
logger.info("Link credits not yet added. Adding: {}", credits);
link.addCredits(credits);
if (link == null) {
return;
}

checkAndAddCredits(link);
drain();
}

Expand Down Expand Up @@ -469,19 +474,30 @@ private void drainQueue() {
break;
}

Message message = messageQueue.poll();
final Message message = messageQueue.poll();
if (message == null) {
break;
}

if (isCancelled) {
Operators.onDiscard(message, subscriber.currentContext());
Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);

synchronized (queueLock) {
Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);
pendingMessages.set(0);
}

return;
}

try {
subscriber.onNext(message);

// These don't have to be settled because they're automatically settled by the link, so we
// decrement the count.
if (receiveMode != ReceiveMode.PEEK_LOCK) {
pendingMessages.decrementAndGet();
}
} catch (Exception e) {
logger.error("Exception occurred while handling downstream onNext operation.", e);
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -515,22 +531,63 @@ private boolean checkAndSetTerminated() {
currentLink.dispose();
}

messageQueue.clear();
synchronized (queueLock) {
messageQueue.clear();
pendingMessages.set(0);
}

return true;
}

private int getCreditsToAdd() {
private void checkAndAddCredits(AmqpReceiveLink link) {
if (link == null) {
return;
}

// Credits have already been added to the link. We won't try again.
if (linkCreditsAdded.getAndSet(true)) {
return;
}

final int credits = getCreditsToAdd(link.getCredits());
linkCreditsAdded.set(credits > 0);

logger.info("Link credits to add. Credits: '{}'", credits);

if (credits > 0) {
link.addCredits(credits);
}
}

private int getCreditsToAdd(int linkCredits) {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long r = requested;
final boolean hasBackpressure = r != Long.MAX_VALUE;

if (subscriber == null || r == 0) {
logger.info("Not adding credits. No downstream subscribers or items requested.");
linkCreditsAdded.set(false);
return 0;
}

linkCreditsAdded.set(true);
final int creditsToAdd;
if (messageQueue.isEmpty() && !hasBackpressure) {
creditsToAdd = prefetch;
} else {
synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(prefetch - pending, 1)
: 0;
}
}
}

// If there is no back pressure, always add 1. Otherwise, add whatever is requested.
return r == Long.MAX_VALUE ? 1 : Long.valueOf(r).intValue();
return creditsToAdd;
}
}
Loading

0 comments on commit eadf80d

Please sign in to comment.