Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] sendBatch() for service-bus #7527

Merged
merged 57 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
39a6171
SendableMessageInfoBatch - draft 1
HarshaNalluru Feb 18, 2020
9ae20c4
SendableMessageInfoBatch raw implementation - test failed
HarshaNalluru Feb 24, 2020
e13cfdc
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Feb 24, 2020
83ff552
SendableMessageInfoBatch works fine
HarshaNalluru Feb 25, 2020
86d8fc3
updated test
HarshaNalluru Feb 25, 2020
fb63c4f
SendableMessageInfoBatch
HarshaNalluru Mar 13, 2020
03e76b6
sendBatch2
HarshaNalluru Mar 13, 2020
1d3772b
CreateBatchOptions
HarshaNalluru Mar 13, 2020
88cd7c9
getMaxMessageSize, createBatch, sendBatch2 on MessageSender
HarshaNalluru Mar 13, 2020
30672f1
Simple Send Batch 2 tests - draft
HarshaNalluru Mar 13, 2020
6c13888
resolve merge conflicts
HarshaNalluru Mar 13, 2020
2de9711
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 17, 2020
1aa317a
update test file name
HarshaNalluru Mar 17, 2020
b94477d
senbatch tests - draft 1
HarshaNalluru Mar 17, 2020
44d4f17
sendbatch API
HarshaNalluru Mar 17, 2020
3010567
separate out the methods - testing sendBatch
HarshaNalluru Mar 17, 2020
9851e86
remove .only and console.logs
HarshaNalluru Mar 17, 2020
d0cfa98
remove console.log
HarshaNalluru Mar 17, 2020
a936a9f
_batchSender -> _sender
HarshaNalluru Mar 17, 2020
4a82f6a
// let numberOfMessagesInBatch = 0;
HarshaNalluru Mar 17, 2020
b0c311d
Update Api report
HarshaNalluru Mar 17, 2020
051bf38
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 17, 2020
182c59e
Add a lot more tests
HarshaNalluru Mar 17, 2020
e3eaa90
remove .only from individual tests
HarshaNalluru Mar 17, 2020
ba42326
(craete and send) batch - docs
HarshaNalluru Mar 17, 2020
3dbb5ed
Update tryAdd docs
HarshaNalluru Mar 17, 2020
abd94c9
update API report
HarshaNalluru Mar 17, 2020
7fb2c73
export CreateBatchOptions and SendableMessageInfoBatch
HarshaNalluru Mar 17, 2020
324ea82
update API report
HarshaNalluru Mar 17, 2020
fdb31f6
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 18, 2020
75dfd10
update src code as per the api updates to the message
HarshaNalluru Mar 18, 2020
a5012e3
Update API report
HarshaNalluru Mar 18, 2020
701e193
Update test files asper the API updates
HarshaNalluru Mar 18, 2020
d060031
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 18, 2020
11fcb0e
CreateBatchOptions extends OperationOptions
HarshaNalluru Mar 18, 2020
050f06f
docs for ServiceBusMessageBatch
HarshaNalluru Mar 18, 2020
b0fa344
remove un-need setting default null options - options = {}
HarshaNalluru Mar 18, 2020
d30da07
verify if sender is Open() before sending
HarshaNalluru Mar 18, 2020
52c10e2
update API report
HarshaNalluru Mar 18, 2020
1139e8b
// sendBatch(<Array of messages>) - Commented, senBatch2 -> sendBatch
HarshaNalluru Mar 18, 2020
d55fb0e
update API shape
HarshaNalluru Mar 18, 2020
5e05ff2
fix tests
HarshaNalluru Mar 18, 2020
052cbdd
Update sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts
HarshaNalluru Mar 18, 2020
297bd1f
Update sdk/servicebus/service-bus/test/sendBatch.spec.ts
HarshaNalluru Mar 18, 2020
2240450
retryOptions to getmaxsize
HarshaNalluru Mar 18, 2020
ef0e779
move CreateBatchOptions to models
HarshaNalluru Mar 18, 2020
a95171e
remove unnecessary if checks
HarshaNalluru Mar 18, 2020
02cc4f9
remove _count private property in favour of _encodedMessages.length
HarshaNalluru Mar 18, 2020
4791c3f
use sendBatch to send multiple messages
HarshaNalluru Mar 18, 2020
c979f07
update getMaxMessageSize description
HarshaNalluru Mar 19, 2020
f95bdd9
sendArrayOfMessages -> sendMessages
HarshaNalluru Mar 19, 2020
05d7f80
move verifyAndDeleteAllSentMessages from sendbatch.spec to testutils
HarshaNalluru Mar 19, 2020
6d91741
Update sdk/servicebus/service-bus/test/utils/testutils2.ts
HarshaNalluru Mar 19, 2020
ad8b18c
test update - add checks for tryadd
HarshaNalluru Mar 19, 2020
768470c
Merge branch 'harshan/sb/issue/7067' of https://github.com/HarshaNall…
HarshaNalluru Mar 19, 2020
90d2f4f
fix messageBatch error message in test when Invalid parameters are pa…
HarshaNalluru Mar 19, 2020
7e1c054
Update sdk/servicebus/service-bus/test/utils/testutils2.ts
HarshaNalluru Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ export interface CorrelationFilter {
userProperties?: any;
}

// @public
export interface CreateBatchOptions extends OperationOptions {
maxSizeInBytes?: number;
retryOptions?: RetryOptions;
}

export { DataTransformer }

// @public
Expand Down Expand Up @@ -230,11 +236,12 @@ export interface Sender {
cancelScheduledMessage(sequenceNumber: Long): Promise<void>;
cancelScheduledMessages(sequenceNumbers: Long[]): Promise<void>;
close(): Promise<void>;
createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch>;
isClosed: boolean;
scheduleMessage(scheduledEnqueueTimeUtc: Date, message: ServiceBusMessage): Promise<Long>;
scheduleMessages(scheduledEnqueueTimeUtc: Date, messages: ServiceBusMessage[]): Promise<Long[]>;
send(message: ServiceBusMessage): Promise<void>;
sendBatch(messages: ServiceBusMessage[]): Promise<void>;
sendBatch(messageBatch: ServiceBusMessageBatch): Promise<void>;
}

// @public
Expand Down Expand Up @@ -279,6 +286,14 @@ export interface ServiceBusMessage {
viaPartitionKey?: string;
}

// @public
export interface ServiceBusMessageBatch {
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
readonly count: number;
readonly maxSizeInBytes: number;
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage): boolean;
}

// @public
export interface SessionMessageHandlerOptions {
autoComplete?: boolean;
Expand Down
104 changes: 102 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
RetryOperationType,
Constants,
delay,
MessagingError
MessagingError,
RetryOptions
} from "@azure/core-amqp";
import {
ServiceBusMessage,
Expand All @@ -33,6 +34,8 @@ import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";

/**
* @internal
Expand Down Expand Up @@ -581,6 +584,7 @@ export class MessageSender extends LinkEntity {
}
}

// Not exposed to the users
/**
* Send a batch of Message to the ServiceBus in a single AMQP message. The "message_annotations",
* "application_properties" and "properties" of the first message will be set as that
Expand All @@ -589,7 +593,7 @@ export class MessageSender extends LinkEntity {
* Batch message.
* @return {Promise<void>}
*/
async sendBatch(inputMessages: ServiceBusMessage[]): Promise<void> {
async sendMessages(inputMessages: ServiceBusMessage[]): Promise<void> {
throwErrorIfConnectionClosed(this._context.namespace);
try {
if (!Array.isArray(inputMessages)) {
Expand Down Expand Up @@ -649,6 +653,7 @@ export class MessageSender extends LinkEntity {

// Finally encode the envelope (batch message).
const encodedBatchMessage = RheaMessageUtil.encode(batchMessage);

log.sender(
"[%s]Sender '%s', sending encoded batch message.",
this._context.namespace.connectionId,
Expand All @@ -668,6 +673,101 @@ export class MessageSender extends LinkEntity {
}
}

/**
* Returns maximum message size on the AMQP sender link.
*
* Options to configure the `createBatch` method on the `Sender`.
* - `maxSizeInBytes`: The upper limit for the size of batch.
*
* Example usage:
* ```js
* {
* retryOptions: { maxRetries: 5; timeoutInMs: 10 }
* }
* ```
* @param {{retryOptions?: RetryOptions}} [options={}]
* @returns {Promise<number>}
* @memberof MessageSender
*/
async getMaxMessageSize(
options: {
retryOptions?: RetryOptions;
} = {}
): Promise<number> {
const retryOptions = options.retryOptions || {};
if (this.isOpen()) {
return this._sender!.maxMessageSize;
}
return new Promise<number>(async (resolve, reject) => {
try {
const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
await defaultLock.acquire(this.senderLock, () => {
const config: RetryConfig<void> = {
operation: () => this._init(senderOptions),
connectionId: this._context.namespace.connectionId,
operationType: RetryOperationType.senderLink,
retryOptions: retryOptions
};

return retry<void>(config);
});
resolve(this._sender!.maxMessageSize);
} catch (err) {
reject(err);
}
});
}

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
throwErrorIfConnectionClosed(this._context.namespace);
if (!options) {
options = {};
}
let maxMessageSize = await this.getMaxMessageSize({ retryOptions: options.retryOptions });
if (options.maxSizeInBytes) {
if (options.maxSizeInBytes > maxMessageSize!) {
const error = new Error(
`Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.`
);
throw error;
}
maxMessageSize = options.maxSizeInBytes;
}
return new ServiceBusMessageBatchImpl(this._context, maxMessageSize!);
}

async sendBatch(batchMessage: ServiceBusMessageBatch): Promise<void> {
throwErrorIfConnectionClosed(this._context.namespace);
try {
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
log.sender(
"[%s]Sender '%s', sending encoded batch message.",
this._context.namespace.connectionId,
this.name,
batchMessage
);
return await this._trySend(batchMessage._message!, true);
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
} catch (err) {
log.error(
"[%s] Sender '%s': An error occurred while sending the messages: %O\nError: %O",
this._context.namespace.connectionId,
this.name,
batchMessage,
err
);
throw err;
}
}

/**
* Creates a new sender to the specific ServiceBus entity, and optionally to a given
* partition if it is not present in the context or returns the one present in the context.
Expand Down
5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export {
ReceiveMode,
ReceivedMessageWithLock
} from "./serviceBusMessage";
export { ServiceBusMessageBatch } from "./serviceBusMessageBatch";

export { Delivery, WebSocketImpl } from "rhea-promise";

export { HttpOperationResponse } from "@azure/core-http";
Expand Down Expand Up @@ -53,7 +55,8 @@ export {
MessageHandlers,
ReceiveBatchOptions,
SubscribeOptions,
WaitTimeOptions
WaitTimeOptions,
CreateBatchOptions
} from "./models";

export { Receiver, SubscriptionRuleManagement } from "./receivers/receiver";
Expand Down
27 changes: 27 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

import { OperationOptions } from "@azure/core-auth";
import { RetryOptions } from "@azure/core-amqp";

/**
* The general message handler interface (used for streamMessages).
Expand Down Expand Up @@ -32,6 +33,32 @@ export interface WaitTimeOptions {
maxWaitTimeSeconds: number;
}

/**
* Options to configure the `createBatch` method on the `Sender`.
* - `maxSizeInBytes`: The upper limit for the size of batch.
*
* Example usage:
* ```js
* {
* maxSizeInBytes: 1024 * 1024 // 1 MB
* }
* ```
*/
export interface CreateBatchOptions extends OperationOptions {
/**
* @property
* The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.
*/
maxSizeInBytes?: number;
/**
* Retry policy options that determine the mode, number of retries, retry interval etc.
*
* @type {RetryOptions}
* @memberof CreateBatchOptions
*/
retryOptions?: RetryOptions;
}

/**
* Options when receiving a batch of messages from Service Bus.
*/
Expand Down
87 changes: 66 additions & 21 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
throwTypeErrorIfParameterNotLong,
throwTypeErrorIfParameterNotLongArray
} from "./util/errors";
import { ServiceBusMessageBatch } from "./serviceBusMessageBatch";
import { CreateBatchOptions } from "./models";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand All @@ -34,22 +36,49 @@ export interface Sender {
*/
send(message: ServiceBusMessage): Promise<void>;

// sendBatch(<Array of messages>) - Commented
// /**
// * Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP
// * Sender link if it doesnt already exists.
// *
// * - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId`
// * and/or `partitionKey` properties respectively on the messages.
// * - When doing so, all
// * messages in the batch should have the same `sessionId` (if using sessions) and the same
// * `parititionKey` (if using paritions).
// *
// * @param messages - An array of ServiceBusMessage objects to be sent in a Batch message.
// * @return Promise<void>
// * @throws Error if the underlying connection, client or sender is closed.
// * @throws MessagingError if the service returns an error while sending messages to the service.
// */
// sendBatch(messages: ServiceBusMessage[]): Promise<void>;

/**
* Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP
* Sender link if it doesnt already exists.
* Creates an instance of `ServiceBusMessageBatch` to which one can add messages until the maximum supported size is reached.
* The batch can be passed to the {@link sendBatch} method to send the messages to Azure Service Bus.
* @param options Configures the behavior of the batch.
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.
*
* - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId`
* and/or `partitionKey` properties respectively on the messages.
* - When doing so, all
* messages in the batch should have the same `sessionId` (if using sessions) and the same
* `parititionKey` (if using paritions).
* @param {CreateBatchOptions} [options]
* @returns {Promise<ServiceBusMessageBatch>}
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
* @memberof Sender
*/
createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch>;

/**
* Sends a batch of messages to the associated service-bus entity.
*
* @param messages - An array of SendableMessageInfo objects to be sent in a Batch message.
* @return Promise<void>
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while sending messages to the service.
* @param {ServiceBusMessageBatch} messageBatch A batch of messages that you can create using the {@link createBatch} method.
* @returns {Promise<void>}
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
* @memberof Sender
*/
sendBatch(messages: ServiceBusMessage[]): Promise<void>;
sendBatch(messageBatch: ServiceBusMessageBatch): Promise<void>;

/**
* @property Returns `true` if either the sender or the client that created it has been closed
* @readonly
Expand Down Expand Up @@ -119,6 +148,7 @@ export class SenderImpl implements Sender {
* @property Denotes if close() was called on this sender
*/
private _isClosed: boolean = false;
private _sender: MessageSender;

/**
* @internal
Expand All @@ -127,6 +157,7 @@ export class SenderImpl implements Sender {
constructor(context: ClientEntityContext) {
throwErrorIfConnectionClosed(context.namespace);
this._context = context;
this._sender = MessageSender.create(this._context);
}

private _throwIfSenderOrConnectionClosed(): void {
Expand All @@ -150,18 +181,32 @@ export class SenderImpl implements Sender {
async send(message: ServiceBusMessage): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "message", message);
const sender = MessageSender.create(this._context);
return sender.send(message);
return this._sender.send(message);
}

async sendBatch(messages: ServiceBusMessage[]): Promise<void> {
// sendBatch(<Array of messages>) - Commented
// async sendBatch(messages: ServiceBusMessage[]): Promise<void> {
// this._throwIfSenderOrConnectionClosed();
// throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages);
// if (!Array.isArray(messages)) {
// messages = [messages];
// }
// return this._sender.sendBatch(messages);
// }

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages);
if (!Array.isArray(messages)) {
messages = [messages];
}
const sender = MessageSender.create(this._context);
return sender.sendBatch(messages);
return this._sender.createBatch(options);
}

async sendBatch(messageBatch: ServiceBusMessageBatch): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"messageBatch",
messageBatch
);
return this._sender.sendBatch(messageBatch);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export function getMessagePropertyTypeMismatchError(msg: ServiceBusMessage): Err

/**
* @internal
* Converts given SendableMessageInfo to AmqpMessage
* Converts given ServiceBusMessage to AmqpMessage
*/
export function toAmqpMessage(msg: ServiceBusMessage): AmqpMessage {
const amqpMsg: AmqpMessage = {
Expand Down Expand Up @@ -1043,7 +1043,7 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
* @returns ServiceBusMessage
*/
clone(): ServiceBusMessage {
// We are returning a SendableMessageInfo object because that object can then be sent to Service Bus
// We are returning a ServiceBusMessage object because that object can then be sent to Service Bus
const clone: ServiceBusMessage = {
body: this.body,
contentType: this.contentType,
Expand Down
Loading