Skip to content

Commit

Permalink
[ServiceBus] wait up to max wait time for draining credit when receiv…
Browse files Browse the repository at this point in the history
…ing messages (Azure#28604)

We added a timeout of 200 milliseconds on credit draining before
returning from `receiveMessages()`. There's customer report that
indicates this timeout is aggressive in environment with slow network
and caused link churn because we close link when draining times out to
avoid going into a bad state.

This PR changes the wait time to the max of 200 ms and the remaining
portion of the `maxWaitTimeInMs` option that is passed to
`receiveMessages()`, allowing customer to customize the time out
indirectly.


### Packages impacted by this PR
`@azure/service-bus`

### Issues associated with this PR
Azure#28023 

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?
- exposing an option to set drain timeout. This is not desired since
waiting for draining is an internal implementation detail
- increase 200 ms to some other higher value. It's not possible to
determin a best value for this as environments are different.
- adaptive behavior that adjust the wait time dynamically based on how
frequently the link was closed due to time out. This can leads to
complicate logic.
  • Loading branch information
jeremymeng authored Feb 20, 2024
1 parent 3b785dc commit 5b750f8
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,15 @@ export class BatchingReceiverLite {
private async tryDrainReceiver(
receiver: MinimalReceiver,
loggingPrefix: string,
remainingWaitTimeInMs: number,
abortSignal?: AbortSignalLike,
): Promise<void> {
if (!receiver.isOpen() || receiver.credit <= 0) {
return;
}
let drainTimedout: boolean = false;
let drainTimer: ReturnType<typeof setTimeout>;
const timeToWaitInMs = Math.max(this._drainTimeoutInMs, remainingWaitTimeInMs);
const drainPromise = new Promise<void>((resolve) => {
function drainListener() {
logger.verbose(`${loggingPrefix} Receiver has been drained.`);
Expand All @@ -356,19 +358,19 @@ export class BatchingReceiverLite {
drainTimedout = true;
removeListeners();
resolve();
}, this._drainTimeoutInMs);
}, timeToWaitInMs);
receiver.once(ReceiverEvents.receiverDrained, drainListener);
abortSignal?.addEventListener("abort", onAbort);
});

receiver.drainCredit();
logger.verbose(
`${loggingPrefix} Draining leftover credits(${receiver.credit}), waiting for event_drained event, or timing out after ${this._drainTimeoutInMs} milliseconds...`,
`${loggingPrefix} Draining leftover credits(${receiver.credit}), waiting for event_drained event, or timing out after ${timeToWaitInMs} milliseconds...`,
);
await drainPromise;
if (drainTimedout) {
logger.warning(
`${loggingPrefix} Time out after ${this._drainTimeoutInMs} milliseconds when draining credits. Closing receiver...`,
`${loggingPrefix} Time out after ${timeToWaitInMs} milliseconds when draining credits. Closing receiver...`,
);
// Close the receiver link since we have not received the receiver drain event
// to prevent out-of-sync state between local and remote
Expand Down Expand Up @@ -463,7 +465,8 @@ export class BatchingReceiverLite {
return;
}

await this.tryDrainReceiver(receiver, loggingPrefix, args.abortSignal);
const remainingWaitTimeInMs = getRemainingWaitTimeInMs();
await this.tryDrainReceiver(receiver, loggingPrefix, remainingWaitTimeInMs, args.abortSignal);
logger.verbose(
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`,
);
Expand Down

0 comments on commit 5b750f8

Please sign in to comment.