Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] T2: Enabling abort in receiveBatch and subscribe #9284

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@

import * as log from "../log";
import { Constants, MessagingError, translate } from "@azure/core-amqp";
import { AmqpError, EventContext, OnAmqpEvent, ReceiverEvents, SessionEvents } from "rhea-promise";
import {
AmqpError,
EventContext,
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver
} from "rhea-promise";
import { ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import {
MessageReceiver,
Expand All @@ -14,6 +21,8 @@ import {
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/core-http";
import { checkAndRegisterWithAbortSignal } from "../util/utils";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -84,7 +93,8 @@ export class BatchingReceiver extends MessageReceiver {
*/
async receive(
maxMessageCount: number,
maxWaitTimeInMs?: number
maxWaitTimeInMs?: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context.namespace);

Expand All @@ -95,7 +105,7 @@ export class BatchingReceiver extends MessageReceiver {
this.isReceivingMessages = true;

try {
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs);
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal);
} catch (error) {
log.error(
"[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ",
Expand All @@ -112,7 +122,8 @@ export class BatchingReceiver extends MessageReceiver {

private _receiveImpl(
maxMessageCount: number,
maxWaitTimeInMs: number
maxWaitTimeInMs: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
const brokeredMessages: ServiceBusMessageImpl[] = [];

Expand Down Expand Up @@ -179,6 +190,8 @@ export class BatchingReceiver extends MessageReceiver {
reject(translate(error));
};

let cleanupAbortSignalFn: (() => void) | undefined = undefined;

// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
Expand All @@ -191,6 +204,11 @@ export class BatchingReceiver extends MessageReceiver {
clearTimeout(totalWaitTimer);
}

if (cleanupAbortSignalFn) {
cleanupAbortSignalFn();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
cleanupAbortSignalFn = undefined;
}

// Removing listeners, so that the next receiveMessages() call can set them again.
// Listener for drain is removed when it is determined we dont need to drain or when drain is completed
if (this._receiver) {
Expand Down Expand Up @@ -312,13 +330,29 @@ export class BatchingReceiver extends MessageReceiver {
}
};

const cleanupBeforeReject = (
receiver: Receiver | undefined,
onReceiveErrorHandlerToRemove: OnAmqpEvent
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
): void => {
if (receiver != null) {
receiver.removeListener(ReceiverEvents.receiverError, onReceiveErrorHandlerToRemove);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}

if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
};

// Action to be taken when an error is received.
const onReceiveError: OnAmqpEvent = (context: EventContext) => {
const onReceiveError: OnAmqpEvent = (context: Pick<EventContext, "receiver">) => {
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
cleanupBeforeReject(receiver, onReceiveError);
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

const receiverError = context.receiver && context.receiver.error;
let error: Error | MessagingError;
Expand All @@ -333,15 +367,14 @@ export class BatchingReceiver extends MessageReceiver {
} else {
error = new MessagingError("An error occurred while receiving messages.");
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);
};

cleanupAbortSignalFn = checkAndRegisterWithAbortSignal((err) => {
cleanupBeforeReject(this._receiver, onReceiveError);
reject(err);
}, abortSignal);

// Use new message wait timer only in peekLock mode
if (this.receiveMode === ReceiveMode.peekLock) {
/**
Expand Down Expand Up @@ -462,16 +495,20 @@ export class BatchingReceiver extends MessageReceiver {
onClose: onReceiveClose,
onSessionClose: onSessionClose
});
this._init(rcvrOptions)
this._init(rcvrOptions, abortSignal)
.then(() => {
if (!this._receiver) {
// there's a really small window here where the receiver can be closed
// if that happens we'll just resolve to an empty array of messages.
return resolve([]);
}

this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
// TODO: long-term we probably need to split the code in this promise. This check
// is just a band-aid for now.
if (!abortSignal?.aborted) {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
}
return;
})
.catch(reject);
Expand Down
20 changes: 18 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import * as log from "../log";
import { LinkEntity } from "./linkEntity";
import { ClientEntityContext } from "../clientEntityContext";
import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import { calculateRenewAfterDuration, getUniqueName } from "../util/utils";
import { calculateRenewAfterDuration, getUniqueName, StandardAbortMessage } from "../util/utils";
import { MessageHandlerOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { AbortSignalLike } from "@azure/core-http";
import { AbortError } from "@azure/abort-controller";

/**
* @internal
Expand Down Expand Up @@ -757,8 +759,17 @@ export class MessageReceiver extends LinkEntity {
*
* @returns {Promise<void>} Promise<void>.
*/
protected async _init(options?: ReceiverOptions): Promise<void> {
protected async _init(options?: ReceiverOptions, abortSignal?: AbortSignalLike): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
};

const connectionId = this._context.namespace.connectionId;

checkAborted();

try {
if (!this.isOpen() && !this.isConnecting) {
if (this.wasCloseInitiated) {
Expand All @@ -781,7 +792,10 @@ export class MessageReceiver extends LinkEntity {
}

this.isConnecting = true;

await this._negotiateClaim();
checkAborted();

if (!options) {
options = this._createReceiverOptions();
}
Expand All @@ -794,6 +808,8 @@ export class MessageReceiver extends LinkEntity {

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
checkAborted();

log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
Expand Down
5 changes: 2 additions & 3 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { getUniqueName, waitForTimeoutOrAbortOrResolve, StandardAbortMessage } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -266,7 +266,6 @@ export class MessageSender extends LinkEntity {
try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => this.open(undefined, options?.abortSignal),
abortMessage: "The send operation has been cancelled by the user.",
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
Expand Down Expand Up @@ -382,7 +381,7 @@ export class MessageSender extends LinkEntity {
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError("Sender creation was cancelled by the user.");
throw new AbortError(StandardAbortMessage);
}
};

Expand Down
25 changes: 20 additions & 5 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { ClientEntityContext } from "../clientEntityContext";

import * as log from "../log";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { RetryConfig, RetryOperationType, retry } from "@azure/core-amqp";
import { RetryOperationType, RetryConfig, retry } from "@azure/core-amqp";
import { OperationOptions } from "../modelsToBeSharedWithEventHubs";

/**
* @internal
Expand Down Expand Up @@ -75,20 +76,34 @@ export class StreamingReceiver extends MessageReceiver {
*/
static async create(
context: ClientEntityContext,
options?: ReceiveOptions
options?: ReceiveOptions &
Pick<OperationOptions, "abortSignal"> & {
_createStreamingReceiver?: (
context: ClientEntityContext,
options?: ReceiveOptions
) => StreamingReceiver;
}
): Promise<StreamingReceiver> {
throwErrorIfConnectionClosed(context.namespace);
if (!options) options = {};
if (options.autoComplete == null) options.autoComplete = true;
const sReceiver = new StreamingReceiver(context, options);

let sReceiver: StreamingReceiver;

if (options?._createStreamingReceiver) {
sReceiver = options._createStreamingReceiver(context, options);
} else {
sReceiver = new StreamingReceiver(context, options);
}

const config: RetryConfig<void> = {
operation: () => {
return sReceiver._init();
return sReceiver._init(undefined, options?.abortSignal);
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
},
connectionId: context.namespace.connectionId,
operationType: RetryOperationType.receiveMessage,
retryOptions: options.retryOptions
retryOptions: options.retryOptions,
abortSignal: options?.abortSignal
};
await retry<void>(config);
context.streamingReceiver = sReceiver;
Expand Down
30 changes: 25 additions & 5 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import {
PeekMessagesOptions,
GetMessageIteratorOptions,
MessageHandlerOptions,
MessageHandlers,
ReceiveBatchOptions,
SubscribeOptions
Expand Down Expand Up @@ -208,7 +207,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
private _registerMessageHandler(
onMessage: OnMessage,
onError: OnError,
options?: MessageHandlerOptions
options?: SubscribeOptions
): void {
this._throwIfReceiverOrConnectionClosed();
this._throwIfAlreadyReceiving();
Expand All @@ -222,7 +221,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
throw new TypeError("The parameter 'onError' must be of type 'function'.");
}

StreamingReceiver.create(this._context, {
this._createStreamingReceiver(this._context, {
...options,
receiveMode: convertToInternalReceiveMode(this.receiveMode),
retryOptions: this._retryOptions
Expand All @@ -243,6 +242,19 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
});
}

private _createStreamingReceiver(
context: ClientEntityContext,
options?: ReceiveOptions &
Pick<OperationOptions, "abortSignal"> & {
createStreamingReceiver?: (
context: ClientEntityContext,
options?: ReceiveOptions
) => StreamingReceiver;
}
): Promise<StreamingReceiver> {
return StreamingReceiver.create(context, options);
}

/**
* Returns a promise that resolves to an array of messages based on given count and timeout over
* an AMQP receiver link from a Queue/Subscription.
Expand Down Expand Up @@ -271,11 +283,12 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
maxConcurrentCalls: 0,
receiveMode: convertToInternalReceiveMode(this.receiveMode)
};
this._context.batchingReceiver = BatchingReceiver.create(this._context, options);
this._context.batchingReceiver = this._createBatchingReceiver(this._context, options);
}
const receivedMessages = await this._context.batchingReceiver.receive(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
options?.abortSignal
);
return (receivedMessages as unknown) as ReceivedMessageT[];
};
Expand Down Expand Up @@ -511,4 +524,11 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
}
return false;
}

private _createBatchingReceiver(
context: ClientEntityContext,
options?: ReceiveOptions
): BatchingReceiver {
return BatchingReceiver.create(context, options);
}
}
11 changes: 1 addition & 10 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
retry
} from "@azure/core-amqp";
import { OperationOptions } from "./modelsToBeSharedWithEventHubs";
import { AbortError } from "@azure/abort-controller";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -260,15 +259,7 @@ export class SenderImpl implements Sender {

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
this._throwIfSenderOrConnectionClosed();
try {
return await this._sender.createBatch(options);
} catch (err) {
if (err.name === "AbortError") {
throw new AbortError("The createBatch operation has been cancelled by the user.");
}

throw err;
}
return this._sender.createBatch(options);
}

/**
Expand Down
Loading