Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] T2: Enabling abort in receiveBatch and subscribe #9284

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.0.0-preview.4 (Unreleased)

- Adds abortSignal support throughout Sender and non-session Receivers.
[PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233)
[PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284)

## 7.0.0-preview.3 (2020-06-08)

Expand Down
116 changes: 64 additions & 52 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@

import * as log from "../log";
import { Constants, MessagingError, translate } from "@azure/core-amqp";
import { AmqpError, EventContext, OnAmqpEvent, ReceiverEvents, SessionEvents } from "rhea-promise";
import {
AmqpError,
EventContext,
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver
} from "rhea-promise";
import { ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import {
MessageReceiver,
Expand All @@ -14,6 +21,8 @@ import {
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/core-http";
import { checkAndRegisterWithAbortSignal } from "../util/utils";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -84,7 +93,8 @@ export class BatchingReceiver extends MessageReceiver {
*/
async receive(
maxMessageCount: number,
maxWaitTimeInMs?: number
maxWaitTimeInMs?: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context.namespace);

Expand All @@ -95,7 +105,7 @@ export class BatchingReceiver extends MessageReceiver {
this.isReceivingMessages = true;

try {
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs);
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal);
} catch (error) {
log.error(
"[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ",
Expand All @@ -112,20 +122,23 @@ export class BatchingReceiver extends MessageReceiver {

private _receiveImpl(
maxMessageCount: number,
maxWaitTimeInMs: number
maxWaitTimeInMs: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
const brokeredMessages: ServiceBusMessageImpl[] = [];

this.isReceivingMessages = true;
return new Promise<ServiceBusMessageImpl[]>((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;

// eslint-disable-next-line prefer-const
let cleanupBeforeResolveOrReject: (
receiver: Receiver | undefined,
shouldRemoveDrain: "removeDrainHandler" | "leaveDrainHandler"
) => void;

const onSessionError: OnAmqpEvent = (context: EventContext) => {
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
cleanupBeforeResolveOrReject(this._receiver || context.receiver!, "removeDrainHandler");

const sessionError = context.session && context.session.error;
let error: Error | MessagingError;
Expand All @@ -140,30 +153,11 @@ export class BatchingReceiver extends MessageReceiver {
} else {
error = new MessagingError("An error occurred while receiving messages.");
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);
};

this._connectionErrorHandler = (error: AmqpError | Error): void => {
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}

// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler");

// Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever
if (this.receiveMode === ReceiveMode.receiveAndDelete && brokeredMessages.length) {
Expand All @@ -179,25 +173,14 @@ export class BatchingReceiver extends MessageReceiver {
reject(translate(error));
};

let removeAbortSignalListenersFn: (() => void) | undefined = undefined;

// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}

// Removing listeners, so that the next receiveMessages() call can set them again.
// Listener for drain is removed when it is determined we dont need to drain or when drain is completed
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
cleanupBeforeResolveOrReject(this._receiver, "leaveDrainHandler");

// Drain any pending credits.
if (this._receiver && this._receiver.isOpen() && this._receiver.credit > 0) {
Expand Down Expand Up @@ -313,12 +296,9 @@ export class BatchingReceiver extends MessageReceiver {
};

// Action to be taken when an error is received.
const onReceiveError: OnAmqpEvent = (context: EventContext) => {
const onReceiveError: OnAmqpEvent = (context: Pick<EventContext, "receiver">) => {
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
cleanupBeforeResolveOrReject(receiver, "removeDrainHandler");

const receiverError = context.receiver && context.receiver.error;
let error: Error | MessagingError;
Expand All @@ -333,15 +313,43 @@ export class BatchingReceiver extends MessageReceiver {
} else {
error = new MessagingError("An error occurred while receiving messages.");
}
reject(error);
};

cleanupBeforeResolveOrReject = (
receiver: Receiver | undefined,
shouldRemoveDrain:
| "removeDrainHandler" // remove drain handler (not waiting or initiating a drain)
| "leaveDrainHandler" // listener for drain is removed when it is determined we dont need to drain or when drain is completed
): void => {
if (receiver != null) {
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);

if (shouldRemoveDrain === "removeDrainHandler") {
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
}

if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);

if (removeAbortSignalListenersFn) {
removeAbortSignalListenersFn();
removeAbortSignalListenersFn = undefined;
}
};

removeAbortSignalListenersFn = checkAndRegisterWithAbortSignal((err) => {
cleanupBeforeResolveOrReject(this._receiver, "removeDrainHandler");
reject(err);
}, abortSignal);

// Use new message wait timer only in peekLock mode
if (this.receiveMode === ReceiveMode.peekLock) {
/**
Expand Down Expand Up @@ -462,16 +470,20 @@ export class BatchingReceiver extends MessageReceiver {
onClose: onReceiveClose,
onSessionClose: onSessionClose
});
this._init(rcvrOptions)
this._init(rcvrOptions, abortSignal)
.then(() => {
if (!this._receiver) {
// there's a really small window here where the receiver can be closed
// if that happens we'll just resolve to an empty array of messages.
return resolve([]);
}

this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
// TODO: long-term we probably need to split the code in this promise. This check
// is just a band-aid for now.
if (!abortSignal?.aborted) {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
}
return;
})
.catch(reject);
Expand Down
20 changes: 18 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import * as log from "../log";
import { LinkEntity } from "./linkEntity";
import { ClientEntityContext } from "../clientEntityContext";
import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import { calculateRenewAfterDuration, getUniqueName } from "../util/utils";
import { calculateRenewAfterDuration, getUniqueName, StandardAbortMessage } from "../util/utils";
import { MessageHandlerOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { AbortSignalLike } from "@azure/core-http";
import { AbortError } from "@azure/abort-controller";

/**
* @internal
Expand Down Expand Up @@ -757,8 +759,17 @@ export class MessageReceiver extends LinkEntity {
*
* @returns {Promise<void>} Promise<void>.
*/
protected async _init(options?: ReceiverOptions): Promise<void> {
protected async _init(options?: ReceiverOptions, abortSignal?: AbortSignalLike): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
};

const connectionId = this._context.namespace.connectionId;

checkAborted();

try {
if (!this.isOpen() && !this.isConnecting) {
if (this.wasCloseInitiated) {
Expand All @@ -781,7 +792,10 @@ export class MessageReceiver extends LinkEntity {
}

this.isConnecting = true;

await this._negotiateClaim();
checkAborted();

if (!options) {
options = this._createReceiverOptions();
}
Expand All @@ -794,6 +808,8 @@ export class MessageReceiver extends LinkEntity {

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
checkAborted();

log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
Expand Down
5 changes: 2 additions & 3 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { getUniqueName, waitForTimeoutOrAbortOrResolve, StandardAbortMessage } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -266,7 +266,6 @@ export class MessageSender extends LinkEntity {
try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => this.open(undefined, options?.abortSignal),
abortMessage: "The send operation has been cancelled by the user.",
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
Expand Down Expand Up @@ -382,7 +381,7 @@ export class MessageSender extends LinkEntity {
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError("Sender creation was cancelled by the user.");
throw new AbortError(StandardAbortMessage);
}
};

Expand Down
25 changes: 20 additions & 5 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { ClientEntityContext } from "../clientEntityContext";

import * as log from "../log";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { RetryConfig, RetryOperationType, retry } from "@azure/core-amqp";
import { RetryOperationType, RetryConfig, retry } from "@azure/core-amqp";
import { OperationOptions } from "../modelsToBeSharedWithEventHubs";

/**
* @internal
Expand Down Expand Up @@ -75,20 +76,34 @@ export class StreamingReceiver extends MessageReceiver {
*/
static async create(
context: ClientEntityContext,
options?: ReceiveOptions
options?: ReceiveOptions &
Pick<OperationOptions, "abortSignal"> & {
_createStreamingReceiver?: (
context: ClientEntityContext,
options?: ReceiveOptions
) => StreamingReceiver;
}
): Promise<StreamingReceiver> {
throwErrorIfConnectionClosed(context.namespace);
if (!options) options = {};
if (options.autoComplete == null) options.autoComplete = true;
const sReceiver = new StreamingReceiver(context, options);

let sReceiver: StreamingReceiver;

if (options?._createStreamingReceiver) {
sReceiver = options._createStreamingReceiver(context, options);
} else {
sReceiver = new StreamingReceiver(context, options);
}

const config: RetryConfig<void> = {
operation: () => {
return sReceiver._init();
return sReceiver._init(undefined, options?.abortSignal);
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
},
connectionId: context.namespace.connectionId,
operationType: RetryOperationType.receiveMessage,
retryOptions: options.retryOptions
retryOptions: options.retryOptions,
abortSignal: options?.abortSignal
};
await retry<void>(config);
context.streamingReceiver = sReceiver;
Expand Down
Loading