From c6ee64ff8fe72f2e377d14a32165737ff6cb25fa Mon Sep 17 00:00:00 2001 From: Richard Park Date: Tue, 15 Dec 2020 16:52:57 -0800 Subject: [PATCH] When we go down the 'drain' code path for batching receivers we need to keep the event listeners in place until we actually resolve the promise. This became even more obvious once we moved to scheduling the promise resolution with setTimeout() as it widened the gap (this bug was less easy to see before!). --- .../service-bus/src/core/batchingReceiver.ts | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index d465bf673a49..f4bf498840fd 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -329,12 +329,10 @@ export class BatchingReceiverLite { let totalWaitTimer: NodeJS.Timer | undefined; // eslint-disable-next-line prefer-const - let cleanupBeforeResolveOrReject: ( - shouldRemoveDrain: "removeDrainHandler" | "leaveDrainHandler" - ) => void; + let cleanupBeforeResolveOrReject: () => void; const onError: OnAmqpEvent = (context: EventContext) => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); const eventType = context.session?.error != null ? "session_error" : "receiver_error"; let error = context.session?.error || context.receiver?.error; @@ -355,7 +353,7 @@ export class BatchingReceiverLite { }; this._closeHandler = (error?: AmqpError | Error): void => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); if ( // no error, just closing. Go ahead and return what we have. @@ -379,8 +377,6 @@ export class BatchingReceiverLite { // - maxWaitTime is passed or // - newMessageWaitTimeoutInSeconds is passed since the last message was received const finalAction = (): void => { - cleanupBeforeResolveOrReject("leaveDrainHandler"); - // Drain any pending credits. if (receiver.isOpen() && receiver.credit > 0) { logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`); @@ -389,7 +385,7 @@ export class BatchingReceiverLite { receiver.drain = true; receiver.addCredit(1); } else { - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + cleanupBeforeResolveOrReject(); logger.verbose( `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.` @@ -464,24 +460,20 @@ export class BatchingReceiverLite { // put into the task queue the same way. // So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may // be waiting in the task queue. - setTimeout(() => resolve(brokeredMessages)); + setTimeout(() => { + cleanupBeforeResolveOrReject(); + resolve(brokeredMessages); + }); }; - cleanupBeforeResolveOrReject = ( - shouldRemoveDrain: - | "removeDrainHandler" // remove drain handler (not waiting or initiating a drain) - | "leaveDrainHandler" // listener for drain is removed when it is determined we dont need to drain or when drain is completed - ): void => { + cleanupBeforeResolveOrReject = (): void => { if (receiver != null) { receiver.removeListener(ReceiverEvents.receiverError, onError); receiver.removeListener(ReceiverEvents.message, onReceiveMessage); receiver.session.removeListener(SessionEvents.sessionError, onError); receiver.removeListener(ReceiverEvents.receiverClose, onClose); receiver.session.removeListener(SessionEvents.sessionClose, onClose); - - if (shouldRemoveDrain === "removeDrainHandler") { - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - } + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); } if (totalWaitTimer) { @@ -495,7 +487,7 @@ export class BatchingReceiverLite { }; abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); reject(err); }, args.abortSignal);