From 339d2db3fe2632fdcb0bf135ce715dfe0a95fc81 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Wed, 10 Jun 2020 14:37:34 -0700 Subject: [PATCH 1/5] - Plumbing abortSignal through so it can be used to cancel the init() call in subscribe() and any part of receiveBatch() - Standardizing our AbortError message for all the new abortSignal work for Sender and Receiver. --- .../service-bus/src/core/batchingReceiver.ts | 73 ++++-- .../service-bus/src/core/messageReceiver.ts | 20 +- .../service-bus/src/core/messageSender.ts | 5 +- .../service-bus/src/core/streamingReceiver.ts | 25 +- .../service-bus/src/receivers/receiver.ts | 30 ++- sdk/servicebus/service-bus/src/sender.ts | 11 +- sdk/servicebus/service-bus/src/util/utils.ts | 16 +- .../test/internal/abortSignal.spec.ts | 223 +++++++----------- .../test/internal/batchingReceiver.spec.ts | 119 ++++++++++ .../test/internal/streamingReceiver.spec.ts | 84 +++++++ .../test/internal/unittestUtils.ts | 79 +++++++ .../service-bus/test/internal/utils.spec.ts | 65 ++--- .../test/utils/abortSignalTestUtils.ts | 59 +++++ 13 files changed, 584 insertions(+), 225 deletions(-) create mode 100644 sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts create mode 100644 sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts create mode 100644 sdk/servicebus/service-bus/test/internal/unittestUtils.ts create mode 100644 sdk/servicebus/service-bus/test/utils/abortSignalTestUtils.ts diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index c258613c61b8..edfca4803294 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -3,7 +3,14 @@ import * as log from "../log"; import { Constants, MessagingError, translate } from "@azure/core-amqp"; -import { AmqpError, EventContext, OnAmqpEvent, ReceiverEvents, SessionEvents } from "rhea-promise"; +import { + AmqpError, + EventContext, + OnAmqpEvent, + ReceiverEvents, + SessionEvents, + Receiver +} from "rhea-promise"; import { ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; import { MessageReceiver, @@ -14,6 +21,8 @@ import { } from "./messageReceiver"; import { ClientEntityContext } from "../clientEntityContext"; import { throwErrorIfConnectionClosed } from "../util/errors"; +import { AbortSignalLike } from "@azure/core-http"; +import { checkAndRegisterWithAbortSignal } from "../util/utils"; /** * Describes the batching receiver where the user can receive a specified number of messages for @@ -84,7 +93,8 @@ export class BatchingReceiver extends MessageReceiver { */ async receive( maxMessageCount: number, - maxWaitTimeInMs?: number + maxWaitTimeInMs?: number, + abortSignal?: AbortSignalLike ): Promise { throwErrorIfConnectionClosed(this._context.namespace); @@ -95,7 +105,7 @@ export class BatchingReceiver extends MessageReceiver { this.isReceivingMessages = true; try { - return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs); + return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal); } catch (error) { log.error( "[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ", @@ -112,7 +122,8 @@ export class BatchingReceiver extends MessageReceiver { private _receiveImpl( maxMessageCount: number, - maxWaitTimeInMs: number + maxWaitTimeInMs: number, + abortSignal?: AbortSignalLike ): Promise { const brokeredMessages: ServiceBusMessageImpl[] = []; @@ -179,6 +190,8 @@ export class BatchingReceiver extends MessageReceiver { reject(translate(error)); }; + let cleanupAbortSignalFn: (() => void) | undefined = undefined; + // Final action to be performed after // - maxMessageCount is reached or // - maxWaitTime is passed or @@ -191,6 +204,11 @@ export class BatchingReceiver extends MessageReceiver { clearTimeout(totalWaitTimer); } + if (cleanupAbortSignalFn) { + cleanupAbortSignalFn(); + cleanupAbortSignalFn = undefined; + } + // Removing listeners, so that the next receiveMessages() call can set them again. // Listener for drain is removed when it is determined we dont need to drain or when drain is completed if (this._receiver) { @@ -312,13 +330,29 @@ export class BatchingReceiver extends MessageReceiver { } }; + const cleanupBeforeReject = ( + receiver: Receiver | undefined, + onReceiveErrorHandlerToRemove: OnAmqpEvent + ): void => { + if (receiver != null) { + receiver.removeListener(ReceiverEvents.receiverError, onReceiveErrorHandlerToRemove); + receiver.removeListener(ReceiverEvents.message, onReceiveMessage); + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + receiver.session.removeListener(SessionEvents.sessionError, onSessionError); + } + + if (totalWaitTimer) { + clearTimeout(totalWaitTimer); + } + if (this._newMessageReceivedTimer) { + clearTimeout(this._newMessageReceivedTimer); + } + }; + // Action to be taken when an error is received. - const onReceiveError: OnAmqpEvent = (context: EventContext) => { + const onReceiveError: OnAmqpEvent = (context: Pick) => { const receiver = this._receiver || context.receiver!; - receiver.removeListener(ReceiverEvents.receiverError, onReceiveError); - receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - receiver.session.removeListener(SessionEvents.sessionError, onSessionError); + cleanupBeforeReject(receiver, onReceiveError); const receiverError = context.receiver && context.receiver.error; let error: Error | MessagingError; @@ -333,15 +367,14 @@ export class BatchingReceiver extends MessageReceiver { } else { error = new MessagingError("An error occurred while receiving messages."); } - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); - } - if (this._newMessageReceivedTimer) { - clearTimeout(this._newMessageReceivedTimer); - } reject(error); }; + cleanupAbortSignalFn = checkAndRegisterWithAbortSignal((err) => { + cleanupBeforeReject(this._receiver, onReceiveError); + reject(err); + }, abortSignal); + // Use new message wait timer only in peekLock mode if (this.receiveMode === ReceiveMode.peekLock) { /** @@ -462,7 +495,7 @@ export class BatchingReceiver extends MessageReceiver { onClose: onReceiveClose, onSessionClose: onSessionClose }); - this._init(rcvrOptions) + this._init(rcvrOptions, abortSignal) .then(() => { if (!this._receiver) { // there's a really small window here where the receiver can be closed @@ -470,8 +503,12 @@ export class BatchingReceiver extends MessageReceiver { return resolve([]); } - this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain); - addCreditAndSetTimer(); + // TODO: long-term we probably need to split the code in this promise. This check + // is just a band-aid for now. + if (!abortSignal?.aborted) { + this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain); + addCreditAndSetTimer(); + } return; }) .catch(reject); diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index ec0851391b16..1408d3a559c0 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -24,9 +24,11 @@ import * as log from "../log"; import { LinkEntity } from "./linkEntity"; import { ClientEntityContext } from "../clientEntityContext"; import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; -import { calculateRenewAfterDuration, getUniqueName } from "../util/utils"; +import { calculateRenewAfterDuration, getUniqueName, StandardAbortMessage } from "../util/utils"; import { MessageHandlerOptions } from "../models"; import { DispositionStatusOptions } from "./managementClient"; +import { AbortSignalLike } from "@azure/core-http"; +import { AbortError } from "@azure/abort-controller"; /** * @internal @@ -757,8 +759,17 @@ export class MessageReceiver extends LinkEntity { * * @returns {Promise} Promise. */ - protected async _init(options?: ReceiverOptions): Promise { + protected async _init(options?: ReceiverOptions, abortSignal?: AbortSignalLike): Promise { + const checkAborted = (): void => { + if (abortSignal?.aborted) { + throw new AbortError(StandardAbortMessage); + } + }; + const connectionId = this._context.namespace.connectionId; + + checkAborted(); + try { if (!this.isOpen() && !this.isConnecting) { if (this.wasCloseInitiated) { @@ -781,7 +792,10 @@ export class MessageReceiver extends LinkEntity { } this.isConnecting = true; + await this._negotiateClaim(); + checkAborted(); + if (!options) { options = this._createReceiverOptions(); } @@ -794,6 +808,8 @@ export class MessageReceiver extends LinkEntity { this._receiver = await this._context.namespace.connection.createReceiver(options); this.isConnecting = false; + checkAborted(); + log.error( "[%s] Receiver '%s' with address '%s' has established itself.", connectionId, diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 9ee394656068..a5b36d884ec1 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -32,7 +32,7 @@ import { } from "../serviceBusMessage"; import { ClientEntityContext } from "../clientEntityContext"; import { LinkEntity } from "./linkEntity"; -import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils"; +import { getUniqueName, waitForTimeoutOrAbortOrResolve, StandardAbortMessage } from "../util/utils"; import { throwErrorIfConnectionClosed } from "../util/errors"; import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch"; import { CreateBatchOptions } from "../models"; @@ -266,7 +266,6 @@ export class MessageSender extends LinkEntity { try { await waitForTimeoutOrAbortOrResolve({ actionFn: () => this.open(undefined, options?.abortSignal), - abortMessage: "The send operation has been cancelled by the user.", abortSignal: options?.abortSignal, timeoutMs: timeoutInMs, timeoutMessage: @@ -382,7 +381,7 @@ export class MessageSender extends LinkEntity { ): Promise { const checkAborted = (): void => { if (abortSignal?.aborted) { - throw new AbortError("Sender creation was cancelled by the user."); + throw new AbortError(StandardAbortMessage); } }; diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 020bcbaa5255..66aee8da448c 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -13,7 +13,8 @@ import { ClientEntityContext } from "../clientEntityContext"; import * as log from "../log"; import { throwErrorIfConnectionClosed } from "../util/errors"; -import { RetryConfig, RetryOperationType, retry } from "@azure/core-amqp"; +import { RetryOperationType, RetryConfig, retry } from "@azure/core-amqp"; +import { OperationOptions } from "../modelsToBeSharedWithEventHubs"; /** * @internal @@ -75,20 +76,34 @@ export class StreamingReceiver extends MessageReceiver { */ static async create( context: ClientEntityContext, - options?: ReceiveOptions + options?: ReceiveOptions & + Pick & { + _createStreamingReceiver?: ( + context: ClientEntityContext, + options?: ReceiveOptions + ) => StreamingReceiver; + } ): Promise { throwErrorIfConnectionClosed(context.namespace); if (!options) options = {}; if (options.autoComplete == null) options.autoComplete = true; - const sReceiver = new StreamingReceiver(context, options); + + let sReceiver: StreamingReceiver; + + if (options?._createStreamingReceiver) { + sReceiver = options._createStreamingReceiver(context, options); + } else { + sReceiver = new StreamingReceiver(context, options); + } const config: RetryConfig = { operation: () => { - return sReceiver._init(); + return sReceiver._init(undefined, options?.abortSignal); }, connectionId: context.namespace.connectionId, operationType: RetryOperationType.receiveMessage, - retryOptions: options.retryOptions + retryOptions: options.retryOptions, + abortSignal: options?.abortSignal }; await retry(config); context.streamingReceiver = sReceiver; diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 9e4bf9f6220a..e263b94f628f 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -4,7 +4,6 @@ import { PeekMessagesOptions, GetMessageIteratorOptions, - MessageHandlerOptions, MessageHandlers, ReceiveBatchOptions, SubscribeOptions @@ -208,7 +207,7 @@ export class ReceiverImpl & { + createStreamingReceiver?: ( + context: ClientEntityContext, + options?: ReceiveOptions + ) => StreamingReceiver; + } + ): Promise { + return StreamingReceiver.create(context, options); + } + /** * Returns a promise that resolves to an array of messages based on given count and timeout over * an AMQP receiver link from a Queue/Subscription. @@ -271,11 +283,12 @@ export class ReceiverImpl { this._throwIfSenderOrConnectionClosed(); - try { - return await this._sender.createBatch(options); - } catch (err) { - if (err.name === "AbortError") { - throw new AbortError("The createBatch operation has been cancelled by the user."); - } - - throw err; - } + return this._sender.createBatch(options); } /** diff --git a/sdk/servicebus/service-bus/src/util/utils.ts b/sdk/servicebus/service-bus/src/util/utils.ts index 9de976a93a0e..3f7391c16db4 100644 --- a/sdk/servicebus/service-bus/src/util/utils.ts +++ b/sdk/servicebus/service-bus/src/util/utils.ts @@ -481,6 +481,12 @@ export type EntityStatus = | "Restoring" | "Unknown"; +/** + * @internal + * @ignore + */ +export const StandardAbortMessage = "The operation was aborted."; + /** * An executor for a function that returns a Promise that obeys both a timeout and an * optional AbortSignal. @@ -498,7 +504,6 @@ export async function waitForTimeoutOrAbortOrResolve(args: { actionFn: () => Promise; timeoutMs: number; timeoutMessage: string; - abortMessage: string; abortSignal?: AbortSignalLike; // these are optional and only here for testing. timeoutFunctions?: { @@ -507,7 +512,7 @@ export async function waitForTimeoutOrAbortOrResolve(args: { }; }): Promise { if (args.abortSignal && args.abortSignal.aborted) { - throw new AbortError(args.abortMessage); + throw new AbortError(StandardAbortMessage); } let timer: any | undefined = undefined; @@ -523,7 +528,7 @@ export async function waitForTimeoutOrAbortOrResolve(args: { // eslint-disable-next-line promise/param-names const abortOrTimeoutPromise = new Promise((_resolve, reject) => { - clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortMessage, args.abortSignal); + clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortSignal); timer = (args.timeoutFunctions?.setTimeoutFn ?? setTimeout)(() => { reject(new OperationTimeoutError(args.timeoutMessage)); @@ -551,7 +556,6 @@ export async function waitForTimeoutOrAbortOrResolve(args: { */ export function checkAndRegisterWithAbortSignal( onAbortFn: (abortError: AbortError) => void, - abortMessage: string, abortSignal?: AbortSignalLike ): () => void { if (abortSignal == null) { @@ -559,12 +563,12 @@ export function checkAndRegisterWithAbortSignal( } if (abortSignal.aborted) { - throw new AbortError(abortMessage); + throw new AbortError(StandardAbortMessage); } const onAbort = (): void => { abortSignal.removeEventListener("abort", onAbort); - onAbortFn(new AbortError(abortMessage)); + onAbortFn(new AbortError(StandardAbortMessage)); }; abortSignal.addEventListener("abort", onAbort); diff --git a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts index 89ec4a7f5786..6f5f0edc5553 100644 --- a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts @@ -6,14 +6,17 @@ import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); const assert = chai.assert; -import { ClientEntityContext } from "../../src/clientEntityContext"; import { MessageSender } from "../../src/core/messageSender"; import { OperationOptions } from "../../src/modelsToBeSharedWithEventHubs"; -import { DefaultDataTransformer, AccessToken } from "@azure/core-amqp"; -import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import { AwaitableSender, delay } from "rhea-promise"; import { ServiceBusMessageBatchImpl } from "../../src/serviceBusMessageBatch"; -import { SenderImpl } from "../../src/sender"; +import { MessageReceiver, ReceiverType } from "../../src/core/messageReceiver"; +import { + createAbortSignalForTest, + createCountdownAbortSignal +} from "../utils/abortSignalTestUtils"; +import { createClientEntityContextForTests } from "./unittestUtils"; +import { StandardAbortMessage } from "../../src/util/utils"; describe("AbortSignal", () => { const testMessageThatDoesntMatter = { @@ -36,24 +39,28 @@ describe("AbortSignal", () => { passedInOptions = options; }; + let abortSignal = createAbortSignalForTest(false); + await sender.send(testMessageThatDoesntMatter, { - abortSignal: createTaggedAbortSignal("passed with send", false) + abortSignal }); - assert.equal((passedInOptions?.abortSignal as any).tag, "passed with send"); + assert.equal(passedInOptions?.abortSignal, abortSignal); + + abortSignal = createAbortSignalForTest(false); const batchMessage = new ServiceBusMessageBatchImpl(clientEntityContext, 1000); await sender.sendBatch(batchMessage, { - abortSignal: createTaggedAbortSignal("passed with sendBatch", false) + abortSignal }); - assert.equal((passedInOptions?.abortSignal as any).tag, "passed with sendBatch"); + assert.equal(passedInOptions?.abortSignal, abortSignal); await sender.sendMessages([testMessageThatDoesntMatter], { - abortSignal: createTaggedAbortSignal("passed with sendMessages", false) + abortSignal }); - assert.equal((passedInOptions?.abortSignal as any).tag, "passed with sendMessages"); + assert.equal(passedInOptions?.abortSignal, abortSignal); }); it("_trySend with an already aborted AbortSignal", async () => { @@ -63,7 +70,7 @@ describe("AbortSignal", () => { throw new Error("INIT SHOULD NEVER HAVE BEEN CALLED"); }; - const abortSignal = createTaggedAbortSignal("_trySend test", true); + const abortSignal = createAbortSignalForTest(true); try { await sender["_trySend"]({} as Buffer, true, { @@ -71,7 +78,7 @@ describe("AbortSignal", () => { }); assert.fail("AbortError should be thrown when the signal is already in an aborted state"); } catch (err) { - assert.equal(err.message, "The send operation has been cancelled by the user."); + assert.equal(err.message, StandardAbortMessage); // we aborted in the sync part of the abort check so these event listeners are never set up assert.isFalse(abortSignal.addWasCalled); @@ -82,26 +89,6 @@ describe("AbortSignal", () => { } }); - it("createBatch()", async () => { - const sender = new SenderImpl(createClientEntityContextForTests()); - - sender["_sender"] = ({ - createBatch: async () => { - throw new AbortError( - "The error matters but this message will be ignored and made specific to the actual operation." - ); - } - } as any) as MessageSender; - - try { - await sender.createBatch(); - assert.fail("AbortError should have been thrown"); - } catch (err) { - assert.equal(err.message, "The createBatch operation has been cancelled by the user."); - assert.equal(err.name, "AbortError"); - } - }); - it("_trySend when the timer expires", async () => { const sender = new MessageSender(clientEntityContext, { timeoutInMs: 1 @@ -130,7 +117,7 @@ describe("AbortSignal", () => { try { await sender["_trySend"]({} as Buffer, true, { - abortSignal: createTaggedAbortSignal("not used for this test", false) + abortSignal: createAbortSignalForTest(false) }); assert.fail("Sender should have thrown in the async portion of the abort handling"); } catch (err) { @@ -150,13 +137,13 @@ describe("AbortSignal", () => { describe("MessageSender.open() aborts after...", () => { it("...beforeLock", async () => { const sender = new MessageSender(createClientEntityContextForTests(), {}); - const abortSignal = createTaggedAbortSignal("countdown", createCountdown(1)); + const abortSignal = createCountdownAbortSignal(1); try { await sender.open(undefined, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "Sender creation was cancelled by the user."); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } @@ -165,13 +152,13 @@ describe("AbortSignal", () => { it("...afterLock", async () => { const sender = new MessageSender(createClientEntityContextForTests(), {}); - const abortSignal = createTaggedAbortSignal("countdown", createCountdown(2)); + const abortSignal = createCountdownAbortSignal(2); try { await sender.open(undefined, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "Sender creation was cancelled by the user."); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } @@ -180,7 +167,7 @@ describe("AbortSignal", () => { it("...negotiateClaim", async () => { let isAborted = false; - const taggedAbortSignal = createTaggedAbortSignal("a", () => isAborted); + const taggedAbortSignal = createAbortSignalForTest(() => isAborted); const sender = new MessageSender( createClientEntityContextForTests({ @@ -197,7 +184,7 @@ describe("AbortSignal", () => { await sender.createBatch({ abortSignal: taggedAbortSignal }); assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "Sender creation was cancelled by the user."); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } @@ -206,15 +193,12 @@ describe("AbortSignal", () => { it("...createAwaitableSender", async () => { let isAborted = false; - const taggedAbortSignal = createTaggedAbortSignal("a", () => isAborted); - - const createdSenders: AwaitableSender[] = []; + const taggedAbortSignal = createAbortSignalForTest(() => isAborted); const sender = new MessageSender( createClientEntityContextForTests({ - onCreateAwaitableSenderCalled: (sender) => { + onCreateAwaitableSenderCalled: () => { isAborted = true; - createdSenders.push(sender); } }), {} @@ -226,105 +210,80 @@ describe("AbortSignal", () => { await sender.createBatch({ abortSignal: taggedAbortSignal }); assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "Sender creation was cancelled by the user."); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } assert.isFalse(sender.isConnecting); }); }); -}); -function createClientEntityContextForTests(options?: { - onCreateAwaitableSenderCalled: (createdSender: AwaitableSender) => void; -}): ClientEntityContext & { initWasCalled: boolean } { - let initWasCalled = false; - - const fakeClientEntityContext = { - entityPath: "queue", - sender: { - credit: 999 - }, - namespace: { - config: { endpoint: "my.service.bus" }, - connectionId: "connection-id", - connection: { - createAwaitableSender: async (): Promise => { - let isClosed = false; - - const testAwaitableSender = ({ - setMaxListeners: () => testAwaitableSender, - isOpen: () => isClosed, - close: async () => { - isClosed = true; - } - } as any) as AwaitableSender; - - options?.onCreateAwaitableSenderCalled(testAwaitableSender); - return testAwaitableSender; - } - }, - dataTransformer: new DefaultDataTransformer(), - tokenCredential: { - getToken() { - return {} as AccessToken; - } - }, - cbsSession: { - cbsLock: "cbs-lock", - async init() { - initWasCalled = true; - } - } - }, - initWasCalled - }; + describe("MessageReceiver.open() aborts after...", () => { + it("...before first async call", async () => { + const messageReceiver = new MessageReceiver( + createClientEntityContextForTests(), + ReceiverType.streaming + ); - return (fakeClientEntityContext as any) as ReturnType; -} + const abortSignal = createCountdownAbortSignal(1); -function createCountdown(numTimesTillAborted: number): () => boolean { - return () => { - --numTimesTillAborted; + try { + await messageReceiver["_init"](undefined, abortSignal); + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } - if (numTimesTillAborted < 0) { - throw new Error( - "We're checking abortSignal more than we thought. Our count is probably incorrect." + assert.isFalse(messageReceiver.isConnecting); + }); + + it("...after negotiateClaim", async () => { + const messageReceiver = new MessageReceiver( + createClientEntityContextForTests(), + ReceiverType.streaming ); - } - return numTimesTillAborted === 0; - }; -} - -function createTaggedAbortSignal( - tag: string, - isAborted: boolean | (() => boolean) = false -): AbortSignalLike & { - tag: string; - removeWasCalled: boolean; - addWasCalled: boolean; -} { - const removeWasCalled = false; - let addWasCalled = false; - - const signal = { - addEventListener(): void { - addWasCalled = true; - }, - removeEventListener(): void { - this.removeWasCalled = true; - }, - tag, - removeWasCalled, - addWasCalled, - get aborted(): boolean { - if (typeof isAborted === "function") { - return isAborted(); + let isAborted = false; + const abortSignal = createAbortSignalForTest(() => isAborted); + + messageReceiver["_negotiateClaim"] = async () => { + isAborted = true; + }; + + try { + await messageReceiver["_init"](undefined, abortSignal); + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); } - return isAborted; - } - }; - return signal; -} + assert.isFalse(messageReceiver.isConnecting); + }); + + it("...after createReceiver", async () => { + let isAborted = false; + const abortSignal = createAbortSignalForTest(() => isAborted); + + const fakeContext = createClientEntityContextForTests({ + onCreateReceiverCalled: () => { + isAborted = true; + } + }); + const messageReceiver = new MessageReceiver(fakeContext, ReceiverType.streaming); + + messageReceiver["_negotiateClaim"] = async () => {}; + + try { + await messageReceiver["_init"](undefined, abortSignal); + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } + + assert.isFalse(messageReceiver.isConnecting); + }); + }); +}); diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts new file mode 100644 index 000000000000..f7f8d44201eb --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +const assert = chai.assert; + +import { BatchingReceiver } from "../../src/core/batchingReceiver"; +import { createClientEntityContextForTests } from "./unittestUtils"; +import { ReceiverImpl } from "../../src/receivers/receiver"; +import { createAbortSignalForTest } from "../utils/abortSignalTestUtils"; +import { AbortController, AbortSignalLike } from "@azure/abort-controller"; +import { ServiceBusMessageImpl, ReceiveMode } from "../../src/serviceBusMessage"; +import { Receiver as RheaReceiver, ReceiverEvents, SessionEvents } from "rhea-promise"; +import { StandardAbortMessage } from "../../src/util/utils"; + +describe("BatchingReceiver unit tests", () => { + describe("AbortSignal", () => { + // establish that the abortSignal does get properly sent down. Now the rest of the tests + // will test at the BatchingReceiver level. + it("is plumbed into BatchingReceiver from ReceiverImpl", async () => { + const origAbortSignal = createAbortSignalForTest(); + const receiver = new ReceiverImpl(createClientEntityContextForTests(), "peekLock"); + let wasCalled = false; + + receiver["_createBatchingReceiver"] = () => { + return { + async receive( + _maxMessageCount: number, + _maxWaitTimeInMs?: number, + abortSignal?: AbortSignalLike + ): Promise { + assert.equal(abortSignal, origAbortSignal); + wasCalled = true; + return []; + } + } as BatchingReceiver; + }; + + await receiver.receiveBatch(1000, { + maxWaitTimeInMs: 60 * 1000, + abortSignal: origAbortSignal + }); + + assert.isTrue(wasCalled, "Expected a call to BatchingReceiver.receive()"); + }); + + it("abortSignal is already signalled", async () => { + const abortController = new AbortController(); + abortController.abort(); + + const receiver = new BatchingReceiver(createClientEntityContextForTests(), { + receiveMode: ReceiveMode.peekLock + }); + + try { + await receiver.receive(1, 60 * 1000, abortController.signal); + assert.fail("Should have thrown"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } + }).timeout(1000); + + it("abortSignal while receive is in process", async () => { + const abortController = new AbortController(); + + const receiver = new BatchingReceiver(createClientEntityContextForTests(), { + receiveMode: ReceiveMode.peekLock + }); + + const listenersBeingRemoved: string[] = []; + const callsDoneAfterAbort: string[] = []; + + receiver["_init"] = async () => { + // just enough of a Receiver to validate that cleanup actions + // are being run on abort. + receiver["_receiver"] = ({ + removeListener: (eventType: ReceiverEvents) => { + listenersBeingRemoved.push(eventType.toString()); + }, + on: (eventType: ReceiverEvents) => { + // we definitely shouldn't be registering any new handlers if we've aborted. + callsDoneAfterAbort.push(eventType); + }, + addCredit: () => { + // we definitely shouldn't be adding credits if we know we've aborted. + callsDoneAfterAbort.push("addCredit"); + }, + session: { + removeListener: (eventType: SessionEvents) => { + listenersBeingRemoved.push(eventType.toString()); + } + } + } as any) as RheaReceiver; + + abortController.abort(); + }; + + try { + await receiver.receive(1, 60 * 1000, abortController.signal); + assert.fail("Should have thrown"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } + + // order here isn't important, it just happens to be the order we call in `cleanupBeforeReject` + assert.deepEqual(listenersBeingRemoved, [ + "receiver_error", + "message", + "receiver_drained", + "session_error" + ]); + assert.isEmpty(callsDoneAfterAbort); + }); + }); +}); diff --git a/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts new file mode 100644 index 000000000000..41d46aaabfbc --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import { ReceiverImpl } from "../../src/receivers/receiver"; +import { createClientEntityContextForTests, getPromiseResolverForTest } from "./unittestUtils"; +import { ClientEntityContext } from "../../src/clientEntityContext"; +import { ReceiveOptions } from "../../src/core/messageReceiver"; +import { OperationOptions } from "../../src"; +import { StreamingReceiver } from "../../src/core/streamingReceiver"; +import { AbortController, AbortSignalLike } from "@azure/abort-controller"; +chai.use(chaiAsPromised); +const assert = chai.assert; + +describe("StreamingReceiver unit tests", () => { + describe("AbortSignal", () => { + it("sanity check - abortSignal is propagated", async () => { + const receiverImpl = new ReceiverImpl(createClientEntityContextForTests(), "peekLock"); + + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const { resolve, promise } = getPromiseResolverForTest(); + + receiverImpl["_createStreamingReceiver"] = async ( + _context: ClientEntityContext, + options?: ReceiveOptions & + Pick & { + createStreamingReceiver?: ( + context: ClientEntityContext, + options?: ReceiveOptions + ) => StreamingReceiver; + } + ) => { + assert.equal(abortSignal, options?.abortSignal, "abortSignal is properly passed through"); + resolve(); + return {} as StreamingReceiver; + }; + + const errors: string[] = []; + + receiverImpl.subscribe( + { + processMessage: async () => {}, + processError: async (err) => { + errors.push(err.message); + } + }, + { + abortSignal + } + ); + + await promise; + assert.isEmpty(errors); + }).timeout(2000); // just for safety + + it("sanity check - abortSignal is propagated to _init()", async () => { + let wasCalled = false; + const abortController = new AbortController(); + + await StreamingReceiver.create(createClientEntityContextForTests(), { + _createStreamingReceiver: (_context, _options) => { + wasCalled = true; + return ({ + _init: (_ignoredOptions: any, abortSignal?: AbortSignalLike) => { + wasCalled = true; + assert.equal( + abortSignal, + abortController.signal, + "abortSignal passed in when created should propagate to _init()" + ); + return; + } + } as any) as StreamingReceiver; + }, + abortSignal: abortController.signal + }); + + assert.isTrue(wasCalled); + }); + }); +}); diff --git a/sdk/servicebus/service-bus/test/internal/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unittestUtils.ts new file mode 100644 index 000000000000..9e9769052f28 --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/unittestUtils.ts @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { ClientEntityContext } from "../../src/clientEntityContext"; +import { AwaitableSender, Receiver as RheaReceiver } from "rhea-promise"; +import { DefaultDataTransformer, AccessToken } from "@azure/core-amqp"; + +export function createClientEntityContextForTests(options?: { + onCreateAwaitableSenderCalled?: () => void; + onCreateReceiverCalled?: () => void; +}): ClientEntityContext & { initWasCalled: boolean } { + let initWasCalled = false; + + const fakeClientEntityContext = { + entityPath: "queue", + sender: { + credit: 999 + }, + namespace: { + config: { endpoint: "my.service.bus" }, + connectionId: "connection-id", + connection: { + createAwaitableSender: async (): Promise => { + if (options?.onCreateAwaitableSenderCalled) { + options.onCreateAwaitableSenderCalled(); + } + + const testAwaitableSender = ({ + setMaxListeners: () => testAwaitableSender + } as any) as AwaitableSender; + + return testAwaitableSender; + }, + createReceiver: async (): Promise => { + if (options?.onCreateReceiverCalled) { + options.onCreateReceiverCalled(); + } + + return ({} as any) as RheaReceiver; + } + }, + dataTransformer: new DefaultDataTransformer(), + tokenCredential: { + getToken() { + return {} as AccessToken; + } + }, + cbsSession: { + cbsLock: "cbs-lock", + async init() { + initWasCalled = true; + } + } + }, + initWasCalled + }; + + return (fakeClientEntityContext as any) as ReturnType; +} + +export function getPromiseResolverForTest(): { + promise: Promise; + resolve: () => void; + reject: (err: Error) => void; +} { + let resolver: () => void; + let rejecter: (err: Error) => void; + + const promise = new Promise((resolve, reject) => { + resolver = resolve; + rejecter = reject; + }); + + return { + promise, + resolve: resolver!, + reject: rejecter! + }; +} diff --git a/sdk/servicebus/service-bus/test/internal/utils.spec.ts b/sdk/servicebus/service-bus/test/internal/utils.spec.ts index 7402082bd680..d005aa180404 100644 --- a/sdk/servicebus/service-bus/test/internal/utils.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/utils.spec.ts @@ -3,7 +3,8 @@ import { checkAndRegisterWithAbortSignal, - waitForTimeoutOrAbortOrResolve + waitForTimeoutOrAbortOrResolve, + StandardAbortMessage } from "../../src/util/utils"; import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller"; import { delay } from "rhea-promise"; @@ -70,7 +71,6 @@ describe("utils", () => { timeoutMessage: "the message for the timeout", timeoutMs: neverFireMs, abortSignal, - abortMessage: "the message for aborting", timeoutFunctions }); @@ -81,7 +81,7 @@ describe("utils", () => { await prm; assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "the message for aborting"); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } @@ -103,13 +103,12 @@ describe("utils", () => { timeoutMessage: "the message for the timeout", timeoutMs: neverFireMs, abortSignal, - abortMessage: "the message for aborting", timeoutFunctions }); assert.fail("Should have thrown an AbortError"); } catch (err) { - assert.equal(err.message, "the message for aborting"); + assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } @@ -129,7 +128,6 @@ describe("utils", () => { }, timeoutMs: 500, timeoutMessage: "the message for the timeout", - abortMessage: "ignored for this test since we don't have an abort signal", timeoutFunctions }); @@ -151,7 +149,6 @@ describe("utils", () => { timeoutMessage: "the message for the timeout", timeoutMs: 500, abortSignal, - abortMessage: "the message for aborting", timeoutFunctions }); @@ -177,7 +174,6 @@ describe("utils", () => { timeoutMessage: "the message for the timeout", timeoutMs: neverFireMs, abortSignal, - abortMessage: "the message for aborting", timeoutFunctions }); @@ -198,7 +194,6 @@ describe("utils", () => { timeoutMessage: "the message for the timeout", timeoutMs: neverFireMs, abortSignal, - abortMessage: "the message for aborting", timeoutFunctions }); @@ -222,8 +217,7 @@ describe("utils", () => { }, timeoutMessage: "the message for the timeout", timeoutMs: 1, - abortSignal, - abortMessage: "the message for aborting" + abortSignal }); } catch (err) { assert.equal(err.message, "the message for the timeout"); @@ -238,11 +232,10 @@ describe("utils", () => { }, timeoutMessage: "the message for the timeout", timeoutMs: neverFireMs, - abortSignal, - abortMessage: "the message for aborting" + abortSignal }); } catch (err) { - assert.equal(err.message, "the message for aborting"); + assert.equal(err.message, StandardAbortMessage); } }); }); @@ -257,13 +250,9 @@ describe("utils", () => { }); it("abortSignal is undefined", () => { - const cleanupFn = checkAndRegisterWithAbortSignal( - () => { - throw new Error("Will never be called"); - }, - "the message for aborting", - undefined - ); + const cleanupFn = checkAndRegisterWithAbortSignal(() => { + throw new Error("Will never be called"); + }, undefined); // we just return a no-op function in this case. assert.exists(cleanupFn); @@ -274,17 +263,13 @@ describe("utils", () => { abortController.abort(); try { - checkAndRegisterWithAbortSignal( - () => { - throw new Error("Will never be called"); - }, - "the message for aborting", - abortSignal - ); + checkAndRegisterWithAbortSignal(() => { + throw new Error("Will never be called"); + }, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { assert.equal(err.name, "AbortError"); - assert.equal(err.message, "the message for aborting"); + assert.equal(err.message, StandardAbortMessage); } assert.isTrue(abortSignal.ourListenersWereRemoved()); @@ -292,14 +277,10 @@ describe("utils", () => { it("abortSignal abort calls handlers", async () => { let callbackWasCalled = false; - const cleanupFn = checkAndRegisterWithAbortSignal( - (abortError: AbortError) => { - callbackWasCalled = true; - assert.equal(abortError.message, "the message for aborting"); - }, - "the message for aborting", - abortSignal - ); + const cleanupFn = checkAndRegisterWithAbortSignal((abortError: AbortError) => { + callbackWasCalled = true; + assert.equal(abortError.message, StandardAbortMessage); + }, abortSignal); assert.exists(cleanupFn); assert.isFalse(abortSignal.ourListenersWereRemoved()); @@ -317,13 +298,9 @@ describe("utils", () => { it("calling cleanup removes handlers from abortSignal", async () => { let callbackWasCalled = false; - const cleanupFn = checkAndRegisterWithAbortSignal( - () => { - callbackWasCalled = true; - }, - "the message for aborting", - abortSignal - ); + const cleanupFn = checkAndRegisterWithAbortSignal(() => { + callbackWasCalled = true; + }, abortSignal); assert.exists(cleanupFn); assert.isFalse(abortSignal.ourListenersWereRemoved()); diff --git a/sdk/servicebus/service-bus/test/utils/abortSignalTestUtils.ts b/sdk/servicebus/service-bus/test/utils/abortSignalTestUtils.ts new file mode 100644 index 000000000000..72ba214f70ad --- /dev/null +++ b/sdk/servicebus/service-bus/test/utils/abortSignalTestUtils.ts @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { AbortSignalLike } from "@azure/abort-controller"; + +/** + * Creates an AbortSignal that signals that it's aborted after .aborted is checked a certain + * number of times. + */ +export function createCountdownAbortSignal( + numTimesTillAborted: number +): ReturnType { + const countdownFn = () => { + --numTimesTillAborted; + + if (numTimesTillAborted < 0) { + throw new Error( + "We're checking abortSignal more than we thought. Our count is probably incorrect." + ); + } + + return numTimesTillAborted === 0; + }; + + return createAbortSignalForTest(countdownFn); +} + +/** + * Creates an AbortSignal that is already signalled or can be controlled by a + * custom function passed via isAborted. + */ +export function createAbortSignalForTest( + isAborted: boolean | (() => boolean) = false +): AbortSignalLike & { + removeWasCalled: boolean; + addWasCalled: boolean; +} { + const removeWasCalled = false; + let addWasCalled = false; + + const signal = { + addEventListener(): void { + addWasCalled = true; + }, + removeEventListener(): void { + this.removeWasCalled = true; + }, + removeWasCalled, + addWasCalled, + get aborted(): boolean { + if (typeof isAborted === "function") { + return isAborted(); + } + return isAborted; + } + }; + + return signal; +} From 1b1ceb291f528d920ff63f1a2d150dcdba254259 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 11 Jun 2020 11:49:13 -0700 Subject: [PATCH 2/5] Adding in a comment for the workaround I added for eslint. --- sdk/servicebus/service-bus/src/core/batchingReceiver.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index edfca4803294..9111d90e9b01 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -332,6 +332,8 @@ export class BatchingReceiver extends MessageReceiver { const cleanupBeforeReject = ( receiver: Receiver | undefined, + // fixing an eslint warning about using a variable before it's defined (note that we're called _within_ onReceiveError below) + // this is not circular since we don't call onReceiveError, we just need it so we can remove it from the listeners of the receiver. onReceiveErrorHandlerToRemove: OnAmqpEvent ): void => { if (receiver != null) { From e096ec046693cae7d32f4af24ef71c8afac8c37a Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 11 Jun 2020 11:51:39 -0700 Subject: [PATCH 3/5] Rename unclear variable name. --- .../service-bus/src/core/batchingReceiver.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 9111d90e9b01..a81e2d7efa7d 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -190,7 +190,7 @@ export class BatchingReceiver extends MessageReceiver { reject(translate(error)); }; - let cleanupAbortSignalFn: (() => void) | undefined = undefined; + let removeAbortSignalListenersFn: (() => void) | undefined = undefined; // Final action to be performed after // - maxMessageCount is reached or @@ -204,9 +204,9 @@ export class BatchingReceiver extends MessageReceiver { clearTimeout(totalWaitTimer); } - if (cleanupAbortSignalFn) { - cleanupAbortSignalFn(); - cleanupAbortSignalFn = undefined; + if (removeAbortSignalListenersFn) { + removeAbortSignalListenersFn(); + removeAbortSignalListenersFn = undefined; } // Removing listeners, so that the next receiveMessages() call can set them again. @@ -372,7 +372,7 @@ export class BatchingReceiver extends MessageReceiver { reject(error); }; - cleanupAbortSignalFn = checkAndRegisterWithAbortSignal((err) => { + removeAbortSignalListenersFn = checkAndRegisterWithAbortSignal((err) => { cleanupBeforeReject(this._receiver, onReceiveError); reject(err); }, abortSignal); From 27ba534f01770de692d42058675f63d733c63d13 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 11 Jun 2020 14:42:12 -0700 Subject: [PATCH 4/5] Factoring out the cleanup code. There was one minor difference between the "resolve" and "reject" code paths so there's also an option to remove the drain handler. --- .../service-bus/src/core/batchingReceiver.ts | 107 +++++++----------- .../test/internal/batchingReceiver.spec.ts | 4 +- 2 files changed, 42 insertions(+), 69 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index a81e2d7efa7d..ec051cb7d394 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -131,12 +131,14 @@ export class BatchingReceiver extends MessageReceiver { return new Promise((resolve, reject) => { let totalWaitTimer: NodeJS.Timer | undefined; + // eslint-disable-next-line prefer-const + let cleanupBeforeResolveOrReject: ( + receiver: Receiver | undefined, + shouldRemoveDrain: "removeDrainHandler" | "leaveDrainHandler" + ) => void; + const onSessionError: OnAmqpEvent = (context: EventContext) => { - const receiver = this._receiver || context.receiver!; - receiver.removeListener(ReceiverEvents.receiverError, onReceiveError); - receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - receiver.session.removeListener(SessionEvents.sessionError, onSessionError); + cleanupBeforeResolveOrReject(this._receiver || context.receiver!, "removeDrainHandler"); const sessionError = context.session && context.session.error; let error: Error | MessagingError; @@ -151,30 +153,11 @@ export class BatchingReceiver extends MessageReceiver { } else { error = new MessagingError("An error occurred while receiving messages."); } - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); - } - if (this._newMessageReceivedTimer) { - clearTimeout(this._newMessageReceivedTimer); - } reject(error); }; this._connectionErrorHandler = (error: AmqpError | Error): void => { - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); - } - if (this._newMessageReceivedTimer) { - clearTimeout(this._newMessageReceivedTimer); - } - - // Removing listeners, so that the next receiveMessages() call can set them again. - if (this._receiver) { - this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError); - this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError); - } + cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler"); // Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever if (this.receiveMode === ReceiveMode.receiveAndDelete && brokeredMessages.length) { @@ -197,25 +180,7 @@ export class BatchingReceiver extends MessageReceiver { // - maxWaitTime is passed or // - newMessageWaitTimeoutInSeconds is passed since the last message was received const finalAction = (): void => { - if (this._newMessageReceivedTimer) { - clearTimeout(this._newMessageReceivedTimer); - } - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); - } - - if (removeAbortSignalListenersFn) { - removeAbortSignalListenersFn(); - removeAbortSignalListenersFn = undefined; - } - - // Removing listeners, so that the next receiveMessages() call can set them again. - // Listener for drain is removed when it is determined we dont need to drain or when drain is completed - if (this._receiver) { - this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError); - this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError); - } + cleanupBeforeResolveOrReject(this._receiver, "leaveDrainHandler"); // Drain any pending credits. if (this._receiver && this._receiver.isOpen() && this._receiver.credit > 0) { @@ -330,31 +295,10 @@ export class BatchingReceiver extends MessageReceiver { } }; - const cleanupBeforeReject = ( - receiver: Receiver | undefined, - // fixing an eslint warning about using a variable before it's defined (note that we're called _within_ onReceiveError below) - // this is not circular since we don't call onReceiveError, we just need it so we can remove it from the listeners of the receiver. - onReceiveErrorHandlerToRemove: OnAmqpEvent - ): void => { - if (receiver != null) { - receiver.removeListener(ReceiverEvents.receiverError, onReceiveErrorHandlerToRemove); - receiver.removeListener(ReceiverEvents.message, onReceiveMessage); - receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); - receiver.session.removeListener(SessionEvents.sessionError, onSessionError); - } - - if (totalWaitTimer) { - clearTimeout(totalWaitTimer); - } - if (this._newMessageReceivedTimer) { - clearTimeout(this._newMessageReceivedTimer); - } - }; - // Action to be taken when an error is received. const onReceiveError: OnAmqpEvent = (context: Pick) => { const receiver = this._receiver || context.receiver!; - cleanupBeforeReject(receiver, onReceiveError); + cleanupBeforeResolveOrReject(receiver, "removeDrainHandler"); const receiverError = context.receiver && context.receiver.error; let error: Error | MessagingError; @@ -372,8 +316,37 @@ export class BatchingReceiver extends MessageReceiver { reject(error); }; + cleanupBeforeResolveOrReject = ( + receiver: Receiver | undefined, + shouldRemoveDrain: + | "removeDrainHandler" // remove drain handler (not waiting or initiating a drain) + | "leaveDrainHandler" // listener for drain is removed when it is determined we dont need to drain or when drain is completed + ): void => { + if (receiver != null) { + receiver.removeListener(ReceiverEvents.receiverError, onReceiveError); + receiver.removeListener(ReceiverEvents.message, onReceiveMessage); + receiver.session.removeListener(SessionEvents.sessionError, onSessionError); + + if (shouldRemoveDrain === "removeDrainHandler") { + receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain); + } + } + + if (totalWaitTimer) { + clearTimeout(totalWaitTimer); + } + if (this._newMessageReceivedTimer) { + clearTimeout(this._newMessageReceivedTimer); + } + + if (removeAbortSignalListenersFn) { + removeAbortSignalListenersFn(); + removeAbortSignalListenersFn = undefined; + } + }; + removeAbortSignalListenersFn = checkAndRegisterWithAbortSignal((err) => { - cleanupBeforeReject(this._receiver, onReceiveError); + cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler"); reject(err); }, abortSignal); diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index f7f8d44201eb..80e51aa57785 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -110,8 +110,8 @@ describe("BatchingReceiver unit tests", () => { assert.deepEqual(listenersBeingRemoved, [ "receiver_error", "message", - "receiver_drained", - "session_error" + "session_error", + "receiver_drained" ]); assert.isEmpty(callsDoneAfterAbort); }); From 567d808fbb8959b75ef93c13b181357619645ba6 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 11 Jun 2020 15:14:03 -0700 Subject: [PATCH 5/5] Update changelog --- sdk/servicebus/service-bus/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 7e6ac9f51a40..4e0f4fbafe17 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,6 +2,9 @@ ## 7.0.0-preview.4 (Unreleased) +- Adds abortSignal support throughout Sender and non-session Receivers. + [PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233) + [PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284) ## 7.0.0-preview.3 (2020-06-08)