diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index d465bf673a49..f4bf498840fd 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -329,12 +329,10 @@ export class BatchingReceiverLite { let totalWaitTimer: NodeJS.Timer | undefined; // eslint-disable-next-line prefer-const - let cleanupBeforeResolveOrReject: ( - shouldRemoveDrain: "removeDrainHandler" | "leaveDrainHandler" - ) => void; + let cleanupBeforeResolveOrReject: () => void; const onError: OnAmqpEvent = (context: EventContext) => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); const eventType = context.session?.error != null ? "session_error" : "receiver_error"; let error = context.session?.error || context.receiver?.error; @@ -355,7 +353,7 @@ export class BatchingReceiverLite { }; this._closeHandler = (error?: AmqpError | Error): void => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); if ( // no error, just closing. Go ahead and return what we have. @@ -379,8 +377,6 @@ export class BatchingReceiverLite { // - maxWaitTime is passed or // - newMessageWaitTimeoutInSeconds is passed since the last message was received const finalAction = (): void => { - cleanupBeforeResolveOrReject("leaveDrainHandler"); - // Drain any pending credits. if (receiver.isOpen() && receiver.credit > 0) { logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`); @@ -389,7 +385,7 @@ export class BatchingReceiverLite { receiver.drain = true; receiver.addCredit(1); } else { - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + cleanupBeforeResolveOrReject(); logger.verbose( `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.` @@ -464,24 +460,20 @@ export class BatchingReceiverLite { // put into the task queue the same way. // So this call, while odd, just ensures that we resolve _after_ any already-queued onMessage handlers that may // be waiting in the task queue. - setTimeout(() => resolve(brokeredMessages)); + setTimeout(() => { + cleanupBeforeResolveOrReject(); + resolve(brokeredMessages); + }); }; - cleanupBeforeResolveOrReject = ( - shouldRemoveDrain: - | "removeDrainHandler" // remove drain handler (not waiting or initiating a drain) - | "leaveDrainHandler" // listener for drain is removed when it is determined we dont need to drain or when drain is completed - ): void => { + cleanupBeforeResolveOrReject = (): void => { if (receiver != null) { receiver.removeListener(ReceiverEvents.receiverError, onError); receiver.removeListener(ReceiverEvents.message, onReceiveMessage); receiver.session.removeListener(SessionEvents.sessionError, onError); receiver.removeListener(ReceiverEvents.receiverClose, onClose); receiver.session.removeListener(SessionEvents.sessionClose, onClose); - - if (shouldRemoveDrain === "removeDrainHandler") { - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - } + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); } if (totalWaitTimer) { @@ -495,7 +487,7 @@ export class BatchingReceiverLite { }; abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => { - cleanupBeforeResolveOrReject("removeDrainHandler"); + cleanupBeforeResolveOrReject(); reject(err); }, args.abortSignal);