diff --git a/sdk/core/core-amqp/CHANGELOG.md b/sdk/core/core-amqp/CHANGELOG.md index 75491387f83c..ec92b83b5c6c 100644 --- a/sdk/core/core-amqp/CHANGELOG.md +++ b/sdk/core/core-amqp/CHANGELOG.md @@ -1,10 +1,11 @@ # Release History -## 3.0.0 (Unreleased) +## 3.0.0 (2021-06-09) ### Breaking changes - Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects. +- Removes the `AsyncLock` and `defaultLock` exports. `defaultCancellableLock` should be used instead. ## 2.3.0 (2021-04-29) diff --git a/sdk/core/core-amqp/package.json b/sdk/core/core-amqp/package.json index 641284ad7810..3eefcaab17d2 100644 --- a/sdk/core/core-amqp/package.json +++ b/sdk/core/core-amqp/package.json @@ -72,8 +72,6 @@ "@azure/abort-controller": "^1.0.0", "@azure/core-auth": "^1.3.0", "@azure/logger": "^1.0.0", - "@types/async-lock": "^1.1.0", - "async-lock": "^1.1.3", "buffer": "^5.2.1", "events": "^3.0.0", "jssha": "^3.1.0", diff --git a/sdk/core/core-amqp/review/core-amqp.api.md b/sdk/core/core-amqp/review/core-amqp.api.md index dae8fee8ef2a..ec02e1bf8fec 100644 --- a/sdk/core/core-amqp/review/core-amqp.api.md +++ b/sdk/core/core-amqp/review/core-amqp.api.md @@ -7,7 +7,6 @@ import { AbortSignalLike } from '@azure/abort-controller'; import { AccessToken } from '@azure/core-auth'; import { AmqpError } from 'rhea-promise'; -import AsyncLock from 'async-lock'; import { Connection } from 'rhea-promise'; import { Message } from 'rhea-promise'; import { MessageHeader } from 'rhea-promise'; @@ -91,8 +90,6 @@ export const AmqpMessageProperties: { fromRheaMessageProperties(props: MessageProperties): AmqpMessageProperties; }; -export { AsyncLock } - // @public export interface CancellableAsyncLock { acquire(key: string, task: (...args: any[]) => Promise, properties: AcquireLockProperties): Promise; @@ -368,9 +365,6 @@ export function createSasTokenProvider(data: { // @public export const defaultCancellableLock: CancellableAsyncLock; -// @public -export const defaultLock: AsyncLock; - // @public export function delay(delayInMs: number, abortSignal?: AbortSignalLike, abortErrorMsg?: string, value?: T): Promise; diff --git a/sdk/core/core-amqp/src/index.ts b/sdk/core/core-amqp/src/index.ts index 4d192670c58c..a2f524c37717 100644 --- a/sdk/core/core-amqp/src/index.ts +++ b/sdk/core/core-amqp/src/index.ts @@ -34,9 +34,7 @@ export { delay, parseConnectionString, defaultCancellableLock, - defaultLock, ParsedOutput, - AsyncLock, WebSocketOptions } from "./util/utils"; export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage"; diff --git a/sdk/core/core-amqp/src/util/utils.ts b/sdk/core/core-amqp/src/util/utils.ts index 3c949182cf44..f1192ec7ce82 100644 --- a/sdk/core/core-amqp/src/util/utils.ts +++ b/sdk/core/core-amqp/src/util/utils.ts @@ -1,14 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import AsyncLock from "async-lock"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; import { WebSocketImpl } from "rhea-promise"; import { isDefined } from "./typeGuards"; import { StandardAbortMessage } from "../errors"; import { CancellableAsyncLock, CancellableAsyncLockImpl } from "./lock"; -export { AsyncLock }; /** * @internal * @@ -113,22 +111,6 @@ export function parseConnectionString(connectionString: string): ParsedOutput return output as any; } -/** - * @internal - * - * Gets a new instance of the async lock with desired settings. - * @param options - The async lock options. - * @returns AsyncLock - */ -export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock { - return new AsyncLock(options); -} - -/** - * The async lock instance with default settings. - */ -export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 }); - /** * The cancellable async lock instance. */ diff --git a/sdk/core/core-amqp/test/cbs.spec.ts b/sdk/core/core-amqp/test/cbs.spec.ts index d735b76d18b5..c9a9c264533b 100644 --- a/sdk/core/core-amqp/test/cbs.spec.ts +++ b/sdk/core/core-amqp/test/cbs.spec.ts @@ -3,7 +3,7 @@ import { assert } from "chai"; import { AbortController } from "@azure/abort-controller"; -import { CbsClient, defaultLock, TokenType } from "../src"; +import { CbsClient, defaultCancellableLock, TokenType } from "../src"; import { createConnectionStub } from "./utils/createConnectionStub"; import { Connection } from "rhea-promise"; import { stub } from "sinon"; @@ -38,14 +38,18 @@ describe("CbsClient", function() { // Make the existing `init` invocation wait until the abortSignal // is aborted before acquiring it's lock. - await defaultLock.acquire(lock, () => { - return new Promise((resolve) => { - setTimeout(() => { - controller.abort(); - resolve(); - }, 0); - }); - }); + await defaultCancellableLock.acquire( + lock, + () => { + return new Promise((resolve) => { + setTimeout(() => { + controller.abort(); + resolve(); + }, 0); + }); + }, + { abortSignal: undefined, timeoutInMs: undefined } + ); try { await cbsClient.init({ abortSignal: signal }); diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 69ac9933bc28..caa95b003ee9 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,6 +2,9 @@ ## 7.2.0-beta.2 (Unreleased) +- Improves cancellation support when sending messages or initializing a connection to the service. + Resolves [#15311](https://github.com/Azure/azure-sdk-for-js/issues/15311) and [#13504](https://github.com/Azure/azure-sdk-for-js/issues/13504). + ### Bug fixes - ServiceBusSender could throw an error (`TypeError: Cannot read property 'maxMessageSize' of undefined`) if a link was being restarted while calling sendMessages(). diff --git a/sdk/servicebus/service-bus/src/core/linkEntity.ts b/sdk/servicebus/service-bus/src/core/linkEntity.ts index df82ff0fdcdc..a78d32b22b1a 100644 --- a/sdk/servicebus/service-bus/src/core/linkEntity.ts +++ b/sdk/servicebus/service-bus/src/core/linkEntity.ts @@ -4,7 +4,7 @@ import { Constants, TokenType, - defaultLock, + defaultCancellableLock, RequestResponseLink, StandardAbortMessage, isSasTokenProvider @@ -217,12 +217,19 @@ export abstract class LinkEntity { - this._logger.verbose( - `${this._logPrefix} Lock ${this._openLock} acquired for initializing link` - ); - return this._initLinkImpl(options, abortSignal); - }); + return defaultCancellableLock.acquire( + this._openLock, + () => { + this._logger.verbose( + `${this._logPrefix} Lock ${this._openLock} acquired for initializing link` + ); + return this._initLinkImpl(options, abortSignal); + }, + { + abortSignal: abortSignal, + timeoutInMs: Constants.defaultOperationTimeoutInMs + } + ); } private async _initLinkImpl( @@ -258,7 +265,11 @@ export abstract class LinkEntity { - this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`); - return this.closeLinkImpl(); - }); + return defaultCancellableLock.acquire( + this._openLock, + () => { + this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`); + return this.closeLinkImpl(); + }, + { abortSignal: undefined, timeoutInMs: undefined } + ); } private async closeLinkImpl(): Promise { @@ -375,7 +390,15 @@ export abstract class LinkEntity { + private async _negotiateClaim({ + abortSignal, + setTokenRenewal, + timeoutInMs + }: { + setTokenRenewal: boolean; + abortSignal: AbortSignalLike | undefined; + timeoutInMs: number; + }): Promise { this._logger.verbose(`${this._logPrefix} negotiateclaim() has been called`); // Wait for the connectionContext to be ready to open the link. @@ -394,10 +417,22 @@ export abstract class LinkEntity { - this.checkIfConnectionReady(); - return this._context.cbsSession.init(); - }); + + const startTime = Date.now(); + if (!this._context.cbsSession.isOpen()) { + await defaultCancellableLock.acquire( + this._context.cbsSession.cbsLock, + () => { + this.checkIfConnectionReady(); + return this._context.cbsSession.init({ abortSignal, timeoutInMs }); + }, + { + abortSignal, + timeoutInMs: timeoutInMs - (Date.now() - startTime) + } + ); + } + let tokenObject: AccessToken; let tokenType: TokenType; if (isSasTokenProvider(this._context.tokenCredential)) { @@ -433,10 +468,25 @@ export abstract class LinkEntity { - this.checkIfConnectionReady(); - return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType); - }); + await defaultCancellableLock.acquire( + this._context.negotiateClaimLock, + () => { + this.checkIfConnectionReady(); + return this._context.cbsSession.negotiateClaim( + this.audience, + tokenObject.token, + tokenType, + { + abortSignal, + timeoutInMs: timeoutInMs - (Date.now() - startTime) + } + ); + }, + { + abortSignal, + timeoutInMs: timeoutInMs - (Date.now() - startTime) + } + ); this._logger.verbose( "%s Negotiated claim for %s '%s' with with address: %s", this.logPrefix, @@ -485,7 +535,11 @@ export abstract class LinkEntity { try { - await this._negotiateClaim(true); + await this._negotiateClaim({ + setTokenRenewal: true, + abortSignal: undefined, + timeoutInMs: Constants.defaultOperationTimeoutInMs + }); } catch (err) { this._logger.logError( err, diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 568e3b20ab66..980ffee1b281 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -253,7 +253,8 @@ export class MessageSender extends LinkEntity { try { const delivery = await this.link!.send(encodedMessage, { format: sendBatch ? 0x80013700 : 0, - timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000 + timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000, + abortSignal }); logger.verbose( "%s Sender '%s', sent message with delivery id: %d", diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index 957571a04351..571ad9671211 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -20,7 +20,6 @@ import { createConnectionContextForTestsWithSessionId } from "./unittestUtils"; import { StandardAbortMessage } from "@azure/core-amqp"; -import { isLinkLocked } from "../utils/misc"; import { ServiceBusSessionReceiverImpl } from "../../../src/receivers/sessionReceiver"; import { ServiceBusReceiverImpl } from "../../../src/receivers/receiver"; import { MessageSession } from "../../../src/session/messageSession"; @@ -156,6 +155,38 @@ describe("AbortSignal", () => { assert.isTrue((err as any).retryable); } }); + + it("_trySend passes abortSignal to awaitable sender", async () => { + const sender = new MessageSender(connectionContext, "fakeEntityPath", { + timeoutInMs: 1 + }); + closeables.push(sender); + + let wasAbortSignalPassed = false; + sender["_link"] = { + credit: 999, + isOpen: () => true, + session: { + outgoing: { + available: () => true + } + }, + sendable() { + return true; + }, + send(_msg, options) { + if (options?.abortSignal) { + wasAbortSignalPassed = true; + } + return Promise.resolve({}); + } + } as AwaitableSender; + + await sender["_trySend"]({} as Buffer, true, { + abortSignal: createAbortSignalForTest(false) + }); + assert.isTrue(wasAbortSignalPassed, "abortSignal should have been passed to AwaitableSender"); + }); }); describe("MessageSender.open() aborts after...", () => { @@ -172,8 +203,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(sender)); }); it("...afterLock", async () => { @@ -189,8 +218,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(sender)); }); it("...negotiateClaim", async () => { @@ -219,8 +246,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(sender)); }); it("...createAwaitableSender", async () => { @@ -249,8 +274,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(sender)); }); }); @@ -272,8 +295,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(messageReceiver)); }); it("...after negotiateClaim", async () => { @@ -298,8 +319,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(messageReceiver)); }); it("...after createReceiver", async () => { @@ -325,8 +344,6 @@ describe("AbortSignal", () => { assert.equal(err.message, StandardAbortMessage); assert.equal(err.name, "AbortError"); } - - assert.isFalse(isLinkLocked(messageReceiver)); }); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts index 39f2b4e1cacb..6db1eeee0531 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts @@ -1,8 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { AbortSignalLike } from "@azure/abort-controller"; -import { defaultLock } from "@azure/core-amqp"; +import { AbortController, AbortSignalLike } from "@azure/abort-controller"; import chai from "chai"; import chaiAsPromised from "chai-as-promised"; import { Receiver, ReceiverOptions } from "rhea-promise"; @@ -10,7 +9,6 @@ import sinon from "sinon"; import { ConnectionContext } from "../../../src/connectionContext"; import { LinkEntity } from "../../../src/core/linkEntity"; import { receiverLogger } from "../../../src/log"; -import { isLinkLocked } from "../utils/misc"; import { createConnectionContextForTests, createRheaReceiverForTests } from "./unittestUtils"; chai.use(chaiAsPromised); const assert = chai.assert; @@ -80,10 +78,6 @@ describe("LinkEntity unit tests", () => { linkEntity["_negotiateClaim"] = async () => { ++timesCalled; - - // this will just resolve immediately because - // we're already connecting. - assert.isTrue(isLinkLocked(linkEntity)); }; await linkEntity.initLink({}); @@ -113,9 +107,7 @@ describe("LinkEntity unit tests", () => { try { const old = linkEntity["checkIfConnectionReady"]; linkEntity["checkIfConnectionReady"] = () => { - if (defaultLock.isBusy(linkEntity["_context"]["cbsSession"]["cbsLock"])) { - connectionContext["isConnectionClosing"] = () => true; - } + connectionContext["isConnectionClosing"] = () => true; old.apply(linkEntity); }; @@ -130,9 +122,7 @@ describe("LinkEntity unit tests", () => { try { const old = linkEntity["checkIfConnectionReady"]; linkEntity["checkIfConnectionReady"] = () => { - if (defaultLock.isBusy(linkEntity["_context"]["negotiateClaimLock"])) { - connectionContext["isConnectionClosing"] = () => true; - } + connectionContext["isConnectionClosing"] = () => true; old.apply(linkEntity); }; @@ -146,8 +136,8 @@ describe("LinkEntity unit tests", () => { it("connection is restarting just before createRheaLink()", async () => { try { const old = linkEntity["_negotiateClaim"]; - linkEntity["_negotiateClaim"] = async () => { - await old.apply(linkEntity); + linkEntity["_negotiateClaim"] = async (...args) => { + await old.apply(linkEntity, args); connectionContext["isConnectionClosing"] = () => true; }; @@ -242,13 +232,13 @@ describe("LinkEntity unit tests", () => { }); it("abortSignal - if a link was actually created we clean up", async () => { - let isAborted = false; + const abortController = new AbortController(); const orig = linkEntity["createRheaLink"]; let returnedReceiver: Receiver | undefined; linkEntity["createRheaLink"] = async (options) => { - isAborted = true; + abortController.abort(); returnedReceiver = await orig.call(linkEntity, options); assert.isTrue( @@ -259,11 +249,7 @@ describe("LinkEntity unit tests", () => { }; try { - await linkEntity.initLink({}, { - get aborted(): boolean { - return isAborted; - } - } as AbortSignalLike); + await linkEntity.initLink({}, abortController.signal); assert.fail("Should have thrown"); } catch (err) { assert.equal(err.name, "AbortError"); @@ -283,6 +269,31 @@ describe("LinkEntity unit tests", () => { ); }); + it("abortSignal - is passed through to negotiateClaim", async () => { + const abortController = new AbortController(); + + let sawAbortSignal = false; + try { + const old = linkEntity["_negotiateClaim"]; + linkEntity["_negotiateClaim"] = async (props) => { + if (props.abortSignal) { + sawAbortSignal = true; + } + abortController.abort(); + return old.call(linkEntity, props); + }; + + await linkEntity.initLink({}, abortController.signal); + assert.fail("Should have thrown"); + } catch (err) { + assert.isTrue(sawAbortSignal, "Should have seen the abortSignal."); + assert.deepNestedInclude(err, { + name: "AbortError", + message: "The operation was aborted." + }); + } + }); + it("can use a custom name via options", async () => { assert.match(linkEntity.name, /some initial name-/); diff --git a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts index 06c626d7569d..5453faf70643 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts @@ -112,6 +112,9 @@ export function createConnectionContextForTests( }, async close(): Promise { /** Nothing to do here */ + }, + isOpen() { + return initWasCalled; } }, initWasCalled diff --git a/sdk/servicebus/service-bus/test/internal/utils/misc.ts b/sdk/servicebus/service-bus/test/internal/utils/misc.ts index 94541bfeee05..06b52e521046 100644 --- a/sdk/servicebus/service-bus/test/internal/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/internal/utils/misc.ts @@ -1,9 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { defaultLock } from "@azure/core-amqp"; import { Delivery, ServiceBusReceivedMessage } from "../../../src"; -import { LinkEntity } from "../../../src/core/linkEntity"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; export function getDeliveryProperty(message: ServiceBusReceivedMessage): Delivery { @@ -19,7 +17,3 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver "Received message does not contain a .delivery member - not a ServiceBusMessageImpl instance." ); } - -export function isLinkLocked(linkEntity: LinkEntity): boolean { - return defaultLock.isBusy(linkEntity["_openLock"]); -}