Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] pass abortSignal to link initialization and awaitableSender #15349

Merged
merged 8 commits into from
Jun 9, 2021
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 7.2.0-beta.2 (Unreleased)

- Improves cancellation support when sending messages or initializing a connection to the service.
Resolves [#15311](https://github.com/Azure/azure-sdk-for-js/issues/15311) and [#13504](https://github.com/Azure/azure-sdk-for-js/issues/13504).

## 7.2.0-beta.1 (2021-05-18)

Expand Down
98 changes: 76 additions & 22 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import {
Constants,
TokenType,
defaultLock,
defaultCancellableLock,
RequestResponseLink,
StandardAbortMessage,
isSasTokenProvider
Expand Down Expand Up @@ -217,12 +217,19 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for initializing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
});
return defaultCancellableLock.acquire(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to think about this one more.

It opens up an execution path that did not exist before. It's now possible for two _initLinkImpl's to be executing at the same time if the outer lock aborts and the inner operation is still running.

This complicates the code for _initLinkImpl a bit because it now has to be careful when unwinding state that it's not conflicting with another call of itself (which wasn't a concern before).

Do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abortSignal can only cause the acquire call to abort if the inner function hasn't been invoked yet. As soon as that inner function is invoked, both the timeout and abortSignal on acquire are cancelled/removed.

Does that clear up your concerns?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does.

Looking at the doc comment the behavior you described is pretty clear for 'task', but less clear for 'abort'.

this._openLock,
() => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
},
{
abortSignal: abortSignal,
timeoutInMs: Constants.defaultOperationTimeoutInMs
}
);
}

private async _initLinkImpl(
Expand Down Expand Up @@ -258,7 +265,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
);

try {
await this._negotiateClaim();
await this._negotiateClaim({
abortSignal,
setTokenRenewal: false,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});

checkAborted();
this.checkIfConnectionReady();
Expand Down Expand Up @@ -324,10 +335,14 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for closing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
});
return defaultCancellableLock.acquire(
this._openLock,
() => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
},
{ abortSignal: undefined, timeoutInMs: undefined }
);
}

private async closeLinkImpl(): Promise<void> {
Expand Down Expand Up @@ -375,7 +390,15 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* Negotiates the cbs claim for the ClientEntity.
* @param setTokenRenewal - Set the token renewal timer. Default false.
*/
private async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
private async _negotiateClaim({
abortSignal,
setTokenRenewal,
timeoutInMs
}: {
setTokenRenewal: boolean;
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
this._logger.verbose(`${this._logPrefix} negotiateclaim() has been called`);

// Wait for the connectionContext to be ready to open the link.
Expand All @@ -394,10 +417,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, async () => {
this.checkIfConnectionReady();
return this._context.cbsSession.init();
});

const startTime = Date.now();
if (!this._context.cbsSession.isOpen()) {
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.init({ abortSignal, timeoutInMs });
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
}

let tokenObject: AccessToken;
let tokenType: TokenType;
if (isSasTokenProvider(this._context.tokenCredential)) {
Expand Down Expand Up @@ -433,10 +468,25 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
if (!tokenObject) {
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
});
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(
this.audience,
tokenObject.token,
tokenType,
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
this._logger.verbose(
"%s Negotiated claim for %s '%s' with with address: %s",
this.logPrefix,
Expand Down Expand Up @@ -485,7 +535,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
}
this._tokenRenewalTimer = setTimeout(async () => {
try {
await this._negotiateClaim(true);
await this._negotiateClaim({
setTokenRenewal: true,
abortSignal: undefined,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});
} catch (err) {
this._logger.logError(
err,
Expand Down
5 changes: 3 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,11 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
try {
this.link.sendTimeoutInSeconds =
(timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
const delivery = await this.link!.send(
const delivery = await this.link.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
sendBatch ? 0x80013700 : 0,
{ abortSignal }
);
logger.verbose(
"%s Sender '%s', sent message with delivery id: %d",
Expand Down
47 changes: 32 additions & 15 deletions sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
createConnectionContextForTestsWithSessionId
} from "./unittestUtils";
import { StandardAbortMessage } from "@azure/core-amqp";
import { isLinkLocked } from "../utils/misc";
import { ServiceBusSessionReceiverImpl } from "../../../src/receivers/sessionReceiver";
import { ServiceBusReceiverImpl } from "../../../src/receivers/receiver";
import { MessageSession } from "../../../src/session/messageSession";
Expand Down Expand Up @@ -162,6 +161,38 @@ describe("AbortSignal", () => {
assert.isTrue((err as any).retryable);
}
});

it("_trySend passes abortSignal to awaitable sender", async () => {
const sender = new MessageSender(connectionContext, "fakeEntityPath", {
timeoutInMs: 1
});
closeables.push(sender);

let wasAbortSignalPassed = false;
sender["_link"] = {
credit: 999,
isOpen: () => true,
session: {
outgoing: {
available: () => true
}
},
sendable() {
return true;
},
send(_msg, _tag, _format, options) {
if (options?.abortSignal) {
wasAbortSignalPassed = true;
}
return Promise.resolve({});
}
} as AwaitableSender;

await sender["_trySend"]({} as Buffer, true, {
abortSignal: createAbortSignalForTest(false)
});
assert.isTrue(wasAbortSignalPassed, "abortSignal should have been passed to AwaitableSender");
});
});

describe("MessageSender.open() aborts after...", () => {
Expand All @@ -178,8 +209,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(sender));
});

it("...afterLock", async () => {
Expand All @@ -195,8 +224,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(sender));
});

it("...negotiateClaim", async () => {
Expand Down Expand Up @@ -225,8 +252,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(sender));
});

it("...createAwaitableSender", async () => {
Expand Down Expand Up @@ -255,8 +280,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a bit redundant but are the isLinkLocked calls being removed because the new cancellable lock doesn't support it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep exactly. We weren't using that API in any of our runtime code (or test code in event hubs) so I didn't implement it in the cancellable lock. We could always add it if we thought we'd gain something from it though.


assert.isFalse(isLinkLocked(sender));
});
});

Expand All @@ -278,8 +301,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(messageReceiver));
});

it("...after negotiateClaim", async () => {
Expand All @@ -304,8 +325,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(messageReceiver));
});

it("...after createReceiver", async () => {
Expand All @@ -331,8 +350,6 @@ describe("AbortSignal", () => {
assert.equal(err.message, StandardAbortMessage);
assert.equal(err.name, "AbortError");
}

assert.isFalse(isLinkLocked(messageReceiver));
});
});

Expand Down
Loading