Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] pass abortSignal to link initialization and awaitableSender #15349

Merged
merged 8 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Release History

## 3.0.0 (Unreleased)
## 3.0.0 (2021-06-09)

### Breaking changes

- Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects.
- Removes the `AsyncLock` and `defaultLock` exports. `defaultCancellableLock` should be used instead.

## 2.3.0 (2021-04-29)

Expand Down
2 changes: 0 additions & 2 deletions sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
"@azure/abort-controller": "^1.0.0",
"@azure/core-auth": "^1.3.0",
"@azure/logger": "^1.0.0",
"@types/async-lock": "^1.1.0",
"async-lock": "^1.1.3",
"buffer": "^5.2.1",
"events": "^3.0.0",
"jssha": "^3.1.0",
Expand Down
6 changes: 0 additions & 6 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { AbortSignalLike } from '@azure/abort-controller';
import { AccessToken } from '@azure/core-auth';
import { AmqpError } from 'rhea-promise';
import AsyncLock from 'async-lock';
import { Connection } from 'rhea-promise';
import { Message } from 'rhea-promise';
import { MessageHeader } from 'rhea-promise';
Expand Down Expand Up @@ -91,8 +90,6 @@ export const AmqpMessageProperties: {
fromRheaMessageProperties(props: MessageProperties): AmqpMessageProperties;
};

export { AsyncLock }

// @public
export interface CancellableAsyncLock {
acquire<T = void>(key: string, task: (...args: any[]) => Promise<T>, properties: AcquireLockProperties): Promise<T>;
Expand Down Expand Up @@ -368,9 +365,6 @@ export function createSasTokenProvider(data: {
// @public
export const defaultCancellableLock: CancellableAsyncLock;

// @public
export const defaultLock: AsyncLock;

// @public
export function delay<T>(delayInMs: number, abortSignal?: AbortSignalLike, abortErrorMsg?: string, value?: T): Promise<T | void>;

Expand Down
2 changes: 0 additions & 2 deletions sdk/core/core-amqp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ export {
delay,
parseConnectionString,
defaultCancellableLock,
defaultLock,
ParsedOutput,
AsyncLock,
WebSocketOptions
} from "./util/utils";
export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage";
Expand Down
18 changes: 0 additions & 18 deletions sdk/core/core-amqp/src/util/utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import AsyncLock from "async-lock";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { WebSocketImpl } from "rhea-promise";
import { isDefined } from "./typeGuards";
import { StandardAbortMessage } from "../errors";
import { CancellableAsyncLock, CancellableAsyncLockImpl } from "./lock";

export { AsyncLock };
/**
* @internal
*
Expand Down Expand Up @@ -113,22 +111,6 @@ export function parseConnectionString<T>(connectionString: string): ParsedOutput
return output as any;
}

/**
* @internal
*
* Gets a new instance of the async lock with desired settings.
* @param options - The async lock options.
* @returns AsyncLock
*/
export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock {
return new AsyncLock(options);
}

/**
* The async lock instance with default settings.
*/
export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 });

/**
* The cancellable async lock instance.
*/
Expand Down
22 changes: 13 additions & 9 deletions sdk/core/core-amqp/test/cbs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { assert } from "chai";
import { AbortController } from "@azure/abort-controller";
import { CbsClient, defaultLock, TokenType } from "../src";
import { CbsClient, defaultCancellableLock, TokenType } from "../src";
import { createConnectionStub } from "./utils/createConnectionStub";
import { Connection } from "rhea-promise";
import { stub } from "sinon";
Expand Down Expand Up @@ -38,14 +38,18 @@ describe("CbsClient", function() {

// Make the existing `init` invocation wait until the abortSignal
// is aborted before acquiring it's lock.
await defaultLock.acquire(lock, () => {
return new Promise<void>((resolve) => {
setTimeout(() => {
controller.abort();
resolve();
}, 0);
});
});
await defaultCancellableLock.acquire(
lock,
() => {
return new Promise<void>((resolve) => {
setTimeout(() => {
controller.abort();
resolve();
}, 0);
});
},
{ abortSignal: undefined, timeoutInMs: undefined }
);

try {
await cbsClient.init({ abortSignal: signal });
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.2.0-beta.2 (Unreleased)

- Improves cancellation support when sending messages or initializing a connection to the service.
Resolves [#15311](https://github.com/Azure/azure-sdk-for-js/issues/15311) and [#13504](https://github.com/Azure/azure-sdk-for-js/issues/13504).

### Bug fixes

- ServiceBusSender could throw an error (`TypeError: Cannot read property 'maxMessageSize' of undefined`) if a link was being restarted while calling sendMessages().
Expand Down
98 changes: 76 additions & 22 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import {
Constants,
TokenType,
defaultLock,
defaultCancellableLock,
RequestResponseLink,
StandardAbortMessage,
isSasTokenProvider
Expand Down Expand Up @@ -217,12 +217,19 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for initializing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
});
return defaultCancellableLock.acquire(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to think about this one more.

It opens up an execution path that did not exist before. It's now possible for two _initLinkImpl's to be executing at the same time if the outer lock aborts and the inner operation is still running.

This complicates the code for _initLinkImpl a bit because it now has to be careful when unwinding state that it's not conflicting with another call of itself (which wasn't a concern before).

Do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abortSignal can only cause the acquire call to abort if the inner function hasn't been invoked yet. As soon as that inner function is invoked, both the timeout and abortSignal on acquire are cancelled/removed.

Does that clear up your concerns?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does.

Looking at the doc comment the behavior you described is pretty clear for 'task', but less clear for 'abort'.

this._openLock,
() => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
},
{
abortSignal: abortSignal,
timeoutInMs: Constants.defaultOperationTimeoutInMs
}
);
}

private async _initLinkImpl(
Expand Down Expand Up @@ -258,7 +265,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
);

try {
await this._negotiateClaim();
await this._negotiateClaim({
abortSignal,
setTokenRenewal: false,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});

checkAborted();
this.checkIfConnectionReady();
Expand Down Expand Up @@ -324,10 +335,14 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for closing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
});
return defaultCancellableLock.acquire(
this._openLock,
() => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
},
{ abortSignal: undefined, timeoutInMs: undefined }
);
}

private async closeLinkImpl(): Promise<void> {
Expand Down Expand Up @@ -375,7 +390,15 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* Negotiates the cbs claim for the ClientEntity.
* @param setTokenRenewal - Set the token renewal timer. Default false.
*/
private async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
private async _negotiateClaim({
abortSignal,
setTokenRenewal,
timeoutInMs
}: {
setTokenRenewal: boolean;
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
this._logger.verbose(`${this._logPrefix} negotiateclaim() has been called`);

// Wait for the connectionContext to be ready to open the link.
Expand All @@ -394,10 +417,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, async () => {
this.checkIfConnectionReady();
return this._context.cbsSession.init();
});

const startTime = Date.now();
if (!this._context.cbsSession.isOpen()) {
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.init({ abortSignal, timeoutInMs });
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
}

let tokenObject: AccessToken;
let tokenType: TokenType;
if (isSasTokenProvider(this._context.tokenCredential)) {
Expand Down Expand Up @@ -433,10 +468,25 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
if (!tokenObject) {
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
});
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(
this.audience,
tokenObject.token,
tokenType,
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
this._logger.verbose(
"%s Negotiated claim for %s '%s' with with address: %s",
this.logPrefix,
Expand Down Expand Up @@ -485,7 +535,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
}
this._tokenRenewalTimer = setTimeout(async () => {
try {
await this._negotiateClaim(true);
await this._negotiateClaim({
setTokenRenewal: true,
abortSignal: undefined,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});
} catch (err) {
this._logger.logError(
err,
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
try {
const delivery = await this.link!.send(encodedMessage, {
format: sendBatch ? 0x80013700 : 0,
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
abortSignal
});
logger.verbose(
"%s Sender '%s', sent message with delivery id: %d",
Expand Down
Loading