Skip to content

Commit

Permalink
[service-bus] T2: Enabling abort in receiveBatch and subscribe (#9284)
Browse files Browse the repository at this point in the history
- Plumbing abortSignal through so it can be used to cancel the init() call in subscribe() and any part of receiveBatch()
- Standardizing our AbortError message for all the new abortSignal work for Sender and Receiver.
- Factoring out the cleanup code.

Part of #4375
  • Loading branch information
richardpark-msft authored Jun 12, 2020
1 parent b425f64 commit 7a0eaeb
Show file tree
Hide file tree
Showing 14 changed files with 596 additions and 259 deletions.
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.0.0-preview.4 (Unreleased)

- Adds abortSignal support throughout Sender and non-session Receivers.
[PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233)
[PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284)

## 7.0.0-preview.3 (2020-06-08)

Expand Down
116 changes: 64 additions & 52 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@

import * as log from "../log";
import { Constants, MessagingError, translate } from "@azure/core-amqp";
import { AmqpError, EventContext, OnAmqpEvent, ReceiverEvents, SessionEvents } from "rhea-promise";
import {
AmqpError,
EventContext,
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver
} from "rhea-promise";
import { ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import {
MessageReceiver,
Expand All @@ -14,6 +21,8 @@ import {
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/core-http";
import { checkAndRegisterWithAbortSignal } from "../util/utils";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -84,7 +93,8 @@ export class BatchingReceiver extends MessageReceiver {
*/
async receive(
maxMessageCount: number,
maxWaitTimeInMs?: number
maxWaitTimeInMs?: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context.namespace);

Expand All @@ -95,7 +105,7 @@ export class BatchingReceiver extends MessageReceiver {
this.isReceivingMessages = true;

try {
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs);
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal);
} catch (error) {
log.error(
"[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ",
Expand All @@ -112,20 +122,23 @@ export class BatchingReceiver extends MessageReceiver {

private _receiveImpl(
maxMessageCount: number,
maxWaitTimeInMs: number
maxWaitTimeInMs: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
const brokeredMessages: ServiceBusMessageImpl[] = [];

this.isReceivingMessages = true;
return new Promise<ServiceBusMessageImpl[]>((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;

// eslint-disable-next-line prefer-const
let cleanupBeforeResolveOrReject: (
receiver: Receiver | undefined,
shouldRemoveDrain: "removeDrainHandler" | "leaveDrainHandler"
) => void;

const onSessionError: OnAmqpEvent = (context: EventContext) => {
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
cleanupBeforeResolveOrReject(this._receiver || context.receiver!, "removeDrainHandler");

const sessionError = context.session && context.session.error;
let error: Error | MessagingError;
Expand All @@ -140,30 +153,11 @@ export class BatchingReceiver extends MessageReceiver {
} else {
error = new MessagingError("An error occurred while receiving messages.");
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);
};

this._connectionErrorHandler = (error: AmqpError | Error): void => {
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}

// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler");

// Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever
if (this.receiveMode === ReceiveMode.receiveAndDelete && brokeredMessages.length) {
Expand All @@ -179,25 +173,14 @@ export class BatchingReceiver extends MessageReceiver {
reject(translate(error));
};

let removeAbortSignalListenersFn: (() => void) | undefined = undefined;

// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}

// Removing listeners, so that the next receiveMessages() call can set them again.
// Listener for drain is removed when it is determined we dont need to drain or when drain is completed
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
cleanupBeforeResolveOrReject(this._receiver, "leaveDrainHandler");

// Drain any pending credits.
if (this._receiver && this._receiver.isOpen() && this._receiver.credit > 0) {
Expand Down Expand Up @@ -313,12 +296,9 @@ export class BatchingReceiver extends MessageReceiver {
};

// Action to be taken when an error is received.
const onReceiveError: OnAmqpEvent = (context: EventContext) => {
const onReceiveError: OnAmqpEvent = (context: Pick<EventContext, "receiver">) => {
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
cleanupBeforeResolveOrReject(receiver, "removeDrainHandler");

const receiverError = context.receiver && context.receiver.error;
let error: Error | MessagingError;
Expand All @@ -333,15 +313,43 @@ export class BatchingReceiver extends MessageReceiver {
} else {
error = new MessagingError("An error occurred while receiving messages.");
}
reject(error);
};

cleanupBeforeResolveOrReject = (
receiver: Receiver | undefined,
shouldRemoveDrain:
| "removeDrainHandler" // remove drain handler (not waiting or initiating a drain)
| "leaveDrainHandler" // listener for drain is removed when it is determined we dont need to drain or when drain is completed
): void => {
if (receiver != null) {
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);

if (shouldRemoveDrain === "removeDrainHandler") {
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
}

if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);

if (removeAbortSignalListenersFn) {
removeAbortSignalListenersFn();
removeAbortSignalListenersFn = undefined;
}
};

removeAbortSignalListenersFn = checkAndRegisterWithAbortSignal((err) => {
cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler");
reject(err);
}, abortSignal);

// Use new message wait timer only in peekLock mode
if (this.receiveMode === ReceiveMode.peekLock) {
/**
Expand Down Expand Up @@ -462,16 +470,20 @@ export class BatchingReceiver extends MessageReceiver {
onClose: onReceiveClose,
onSessionClose: onSessionClose
});
this._init(rcvrOptions)
this._init(rcvrOptions, abortSignal)
.then(() => {
if (!this._receiver) {
// there's a really small window here where the receiver can be closed
// if that happens we'll just resolve to an empty array of messages.
return resolve([]);
}

this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
// TODO: long-term we probably need to split the code in this promise. This check
// is just a band-aid for now.
if (!abortSignal?.aborted) {
this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
}
return;
})
.catch(reject);
Expand Down
20 changes: 18 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import * as log from "../log";
import { LinkEntity } from "./linkEntity";
import { ClientEntityContext } from "../clientEntityContext";
import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import { calculateRenewAfterDuration, getUniqueName } from "../util/utils";
import { calculateRenewAfterDuration, getUniqueName, StandardAbortMessage } from "../util/utils";
import { MessageHandlerOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { AbortSignalLike } from "@azure/core-http";
import { AbortError } from "@azure/abort-controller";

/**
* @internal
Expand Down Expand Up @@ -757,8 +759,17 @@ export class MessageReceiver extends LinkEntity {
*
* @returns {Promise<void>} Promise<void>.
*/
protected async _init(options?: ReceiverOptions): Promise<void> {
protected async _init(options?: ReceiverOptions, abortSignal?: AbortSignalLike): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
};

const connectionId = this._context.namespace.connectionId;

checkAborted();

try {
if (!this.isOpen() && !this.isConnecting) {
if (this.wasCloseInitiated) {
Expand All @@ -781,7 +792,10 @@ export class MessageReceiver extends LinkEntity {
}

this.isConnecting = true;

await this._negotiateClaim();
checkAborted();

if (!options) {
options = this._createReceiverOptions();
}
Expand All @@ -794,6 +808,8 @@ export class MessageReceiver extends LinkEntity {

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
checkAborted();

log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
Expand Down
5 changes: 2 additions & 3 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { getUniqueName, waitForTimeoutOrAbortOrResolve, StandardAbortMessage } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -266,7 +266,6 @@ export class MessageSender extends LinkEntity {
try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => this.open(undefined, options?.abortSignal),
abortMessage: "The send operation has been cancelled by the user.",
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
Expand Down Expand Up @@ -382,7 +381,7 @@ export class MessageSender extends LinkEntity {
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError("Sender creation was cancelled by the user.");
throw new AbortError(StandardAbortMessage);
}
};

Expand Down
25 changes: 20 additions & 5 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { ClientEntityContext } from "../clientEntityContext";

import * as log from "../log";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { RetryConfig, RetryOperationType, retry } from "@azure/core-amqp";
import { RetryOperationType, RetryConfig, retry } from "@azure/core-amqp";
import { OperationOptions } from "../modelsToBeSharedWithEventHubs";

/**
* @internal
Expand Down Expand Up @@ -75,20 +76,34 @@ export class StreamingReceiver extends MessageReceiver {
*/
static async create(
context: ClientEntityContext,
options?: ReceiveOptions
options?: ReceiveOptions &
Pick<OperationOptions, "abortSignal"> & {
_createStreamingReceiver?: (
context: ClientEntityContext,
options?: ReceiveOptions
) => StreamingReceiver;
}
): Promise<StreamingReceiver> {
throwErrorIfConnectionClosed(context.namespace);
if (!options) options = {};
if (options.autoComplete == null) options.autoComplete = true;
const sReceiver = new StreamingReceiver(context, options);

let sReceiver: StreamingReceiver;

if (options?._createStreamingReceiver) {
sReceiver = options._createStreamingReceiver(context, options);
} else {
sReceiver = new StreamingReceiver(context, options);
}

const config: RetryConfig<void> = {
operation: () => {
return sReceiver._init();
return sReceiver._init(undefined, options?.abortSignal);
},
connectionId: context.namespace.connectionId,
operationType: RetryOperationType.receiveMessage,
retryOptions: options.retryOptions
retryOptions: options.retryOptions,
abortSignal: options?.abortSignal
};
await retry<void>(config);
context.streamingReceiver = sReceiver;
Expand Down
Loading

0 comments on commit 7a0eaeb

Please sign in to comment.