diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 315934d3e8b3..60d08cf120ad 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -43,6 +43,12 @@ export interface CorrelationFilter { userProperties?: any; } +// @public +export interface CreateBatchOptions extends OperationOptions { + maxSizeInBytes?: number; + retryOptions?: RetryOptions; +} + export { DataTransformer } // @public @@ -230,11 +236,12 @@ export interface Sender { cancelScheduledMessage(sequenceNumber: Long): Promise; cancelScheduledMessages(sequenceNumbers: Long[]): Promise; close(): Promise; + createBatch(options?: CreateBatchOptions): Promise; isClosed: boolean; scheduleMessage(scheduledEnqueueTimeUtc: Date, message: ServiceBusMessage): Promise; scheduleMessages(scheduledEnqueueTimeUtc: Date, messages: ServiceBusMessage[]): Promise; send(message: ServiceBusMessage): Promise; - sendBatch(messages: ServiceBusMessage[]): Promise; + sendBatch(messageBatch: ServiceBusMessageBatch): Promise; } // @public @@ -279,6 +286,14 @@ export interface ServiceBusMessage { viaPartitionKey?: string; } +// @public +export interface ServiceBusMessageBatch { + readonly count: number; + readonly maxSizeInBytes: number; + readonly sizeInBytes: number; + tryAdd(message: ServiceBusMessage): boolean; +} + // @public export interface SessionMessageHandlerOptions { autoComplete?: boolean; diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 5af236a79031..98c4dce5e584 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -22,7 +22,8 @@ import { RetryOperationType, Constants, delay, - MessagingError + MessagingError, + RetryOptions } from "@azure/core-amqp"; import { ServiceBusMessage, @@ -33,6 +34,8 @@ import { ClientEntityContext } from "../clientEntityContext"; import { LinkEntity } from "./linkEntity"; import { getUniqueName } from "../util/utils"; import { throwErrorIfConnectionClosed } from "../util/errors"; +import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch"; +import { CreateBatchOptions } from "../models"; /** * @internal @@ -581,6 +584,7 @@ export class MessageSender extends LinkEntity { } } + // Not exposed to the users /** * Send a batch of Message to the ServiceBus in a single AMQP message. The "message_annotations", * "application_properties" and "properties" of the first message will be set as that @@ -589,7 +593,7 @@ export class MessageSender extends LinkEntity { * Batch message. * @return {Promise} */ - async sendBatch(inputMessages: ServiceBusMessage[]): Promise { + async sendMessages(inputMessages: ServiceBusMessage[]): Promise { throwErrorIfConnectionClosed(this._context.namespace); try { if (!Array.isArray(inputMessages)) { @@ -649,6 +653,7 @@ export class MessageSender extends LinkEntity { // Finally encode the envelope (batch message). const encodedBatchMessage = RheaMessageUtil.encode(batchMessage); + log.sender( "[%s]Sender '%s', sending encoded batch message.", this._context.namespace.connectionId, @@ -668,6 +673,101 @@ export class MessageSender extends LinkEntity { } } + /** + * Returns maximum message size on the AMQP sender link. + * + * Options to configure the `createBatch` method on the `Sender`. + * - `maxSizeInBytes`: The upper limit for the size of batch. + * + * Example usage: + * ```js + * { + * retryOptions: { maxRetries: 5; timeoutInMs: 10 } + * } + * ``` + * @param {{retryOptions?: RetryOptions}} [options={}] + * @returns {Promise} + * @memberof MessageSender + */ + async getMaxMessageSize( + options: { + retryOptions?: RetryOptions; + } = {} + ): Promise { + const retryOptions = options.retryOptions || {}; + if (this.isOpen()) { + return this._sender!.maxMessageSize; + } + return new Promise(async (resolve, reject) => { + try { + const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); + await defaultLock.acquire(this.senderLock, () => { + const config: RetryConfig = { + operation: () => this._init(senderOptions), + connectionId: this._context.namespace.connectionId, + operationType: RetryOperationType.senderLink, + retryOptions: retryOptions + }; + + return retry(config); + }); + resolve(this._sender!.maxMessageSize); + } catch (err) { + reject(err); + } + }); + } + + async createBatch(options?: CreateBatchOptions): Promise { + throwErrorIfConnectionClosed(this._context.namespace); + if (!options) { + options = {}; + } + let maxMessageSize = await this.getMaxMessageSize({ retryOptions: options.retryOptions }); + if (options.maxSizeInBytes) { + if (options.maxSizeInBytes > maxMessageSize!) { + const error = new Error( + `Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.` + ); + throw error; + } + maxMessageSize = options.maxSizeInBytes; + } + return new ServiceBusMessageBatchImpl(this._context, maxMessageSize!); + } + + async sendBatch(batchMessage: ServiceBusMessageBatch): Promise { + throwErrorIfConnectionClosed(this._context.namespace); + try { + if (!this.isOpen()) { + log.sender( + "Acquiring lock %s for initializing the session, sender and " + + "possibly the connection.", + this.senderLock + ); + await defaultLock.acquire(this.senderLock, () => { + return this._init(); + }); + } + log.sender( + "[%s]Sender '%s', sending encoded batch message.", + this._context.namespace.connectionId, + this.name, + batchMessage + ); + return await this._trySend(batchMessage._message!, true); + } catch (err) { + log.error( + "[%s] Sender '%s': An error occurred while sending the messages: %O\nError: %O", + this._context.namespace.connectionId, + this.name, + batchMessage, + err + ); + throw err; + } + } + /** * Creates a new sender to the specific ServiceBus entity, and optionally to a given * partition if it is not present in the context or returns the one present in the context. diff --git a/sdk/servicebus/service-bus/src/index.ts b/sdk/servicebus/service-bus/src/index.ts index 575858024c79..2f422eeb79bb 100644 --- a/sdk/servicebus/service-bus/src/index.ts +++ b/sdk/servicebus/service-bus/src/index.ts @@ -26,6 +26,8 @@ export { ReceiveMode, ReceivedMessageWithLock } from "./serviceBusMessage"; +export { ServiceBusMessageBatch } from "./serviceBusMessageBatch"; + export { Delivery, WebSocketImpl } from "rhea-promise"; export { HttpOperationResponse } from "@azure/core-http"; @@ -53,7 +55,8 @@ export { MessageHandlers, ReceiveBatchOptions, SubscribeOptions, - WaitTimeOptions + WaitTimeOptions, + CreateBatchOptions } from "./models"; export { Receiver, SubscriptionRuleManagement } from "./receivers/receiver"; diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 8daf644f5593..cd113434eda4 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -2,6 +2,7 @@ // Licensed under the MIT License. import { OperationOptions } from "@azure/core-auth"; +import { RetryOptions } from "@azure/core-amqp"; /** * The general message handler interface (used for streamMessages). @@ -32,6 +33,32 @@ export interface WaitTimeOptions { maxWaitTimeSeconds: number; } +/** + * Options to configure the `createBatch` method on the `Sender`. + * - `maxSizeInBytes`: The upper limit for the size of batch. + * + * Example usage: + * ```js + * { + * maxSizeInBytes: 1024 * 1024 // 1 MB + * } + * ``` + */ +export interface CreateBatchOptions extends OperationOptions { + /** + * @property + * The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. + */ + maxSizeInBytes?: number; + /** + * Retry policy options that determine the mode, number of retries, retry interval etc. + * + * @type {RetryOptions} + * @memberof CreateBatchOptions + */ + retryOptions?: RetryOptions; +} + /** * Options when receiving a batch of messages from Service Bus. */ diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index 808287e4f674..18b36d240d33 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -13,6 +13,8 @@ import { throwTypeErrorIfParameterNotLong, throwTypeErrorIfParameterNotLongArray } from "./util/errors"; +import { ServiceBusMessageBatch } from "./serviceBusMessageBatch"; +import { CreateBatchOptions } from "./models"; /** * A Sender can be used to send messages, schedule messages to be sent at a later time @@ -34,22 +36,49 @@ export interface Sender { */ send(message: ServiceBusMessage): Promise; + // sendBatch() - Commented + // /** + // * Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP + // * Sender link if it doesnt already exists. + // * + // * - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId` + // * and/or `partitionKey` properties respectively on the messages. + // * - When doing so, all + // * messages in the batch should have the same `sessionId` (if using sessions) and the same + // * `parititionKey` (if using paritions). + // * + // * @param messages - An array of ServiceBusMessage objects to be sent in a Batch message. + // * @return Promise + // * @throws Error if the underlying connection, client or sender is closed. + // * @throws MessagingError if the service returns an error while sending messages to the service. + // */ + // sendBatch(messages: ServiceBusMessage[]): Promise; + /** - * Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP - * Sender link if it doesnt already exists. + * Creates an instance of `ServiceBusMessageBatch` to which one can add messages until the maximum supported size is reached. + * The batch can be passed to the {@link sendBatch} method to send the messages to Azure Service Bus. + * @param options Configures the behavior of the batch. + * - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. * - * - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId` - * and/or `partitionKey` properties respectively on the messages. - * - When doing so, all - * messages in the batch should have the same `sessionId` (if using sessions) and the same - * `parititionKey` (if using paritions). + * @param {CreateBatchOptions} [options] + * @returns {Promise} + * @throws MessagingError if an error is encountered while sending a message. + * @throws Error if the underlying connection or sender has been closed. + * @memberof Sender + */ + createBatch(options?: CreateBatchOptions): Promise; + + /** + * Sends a batch of messages to the associated service-bus entity. * - * @param messages - An array of SendableMessageInfo objects to be sent in a Batch message. - * @return Promise - * @throws Error if the underlying connection, client or sender is closed. - * @throws MessagingError if the service returns an error while sending messages to the service. + * @param {ServiceBusMessageBatch} messageBatch A batch of messages that you can create using the {@link createBatch} method. + * @returns {Promise} + * @throws MessagingError if an error is encountered while sending a message. + * @throws Error if the underlying connection or sender has been closed. + * @memberof Sender */ - sendBatch(messages: ServiceBusMessage[]): Promise; + sendBatch(messageBatch: ServiceBusMessageBatch): Promise; + /** * @property Returns `true` if either the sender or the client that created it has been closed * @readonly @@ -119,6 +148,7 @@ export class SenderImpl implements Sender { * @property Denotes if close() was called on this sender */ private _isClosed: boolean = false; + private _sender: MessageSender; /** * @internal @@ -127,6 +157,7 @@ export class SenderImpl implements Sender { constructor(context: ClientEntityContext) { throwErrorIfConnectionClosed(context.namespace); this._context = context; + this._sender = MessageSender.create(this._context); } private _throwIfSenderOrConnectionClosed(): void { @@ -150,18 +181,32 @@ export class SenderImpl implements Sender { async send(message: ServiceBusMessage): Promise { this._throwIfSenderOrConnectionClosed(); throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "message", message); - const sender = MessageSender.create(this._context); - return sender.send(message); + return this._sender.send(message); } - async sendBatch(messages: ServiceBusMessage[]): Promise { + // sendBatch() - Commented + // async sendBatch(messages: ServiceBusMessage[]): Promise { + // this._throwIfSenderOrConnectionClosed(); + // throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages); + // if (!Array.isArray(messages)) { + // messages = [messages]; + // } + // return this._sender.sendBatch(messages); + // } + + async createBatch(options?: CreateBatchOptions): Promise { this._throwIfSenderOrConnectionClosed(); - throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages); - if (!Array.isArray(messages)) { - messages = [messages]; - } - const sender = MessageSender.create(this._context); - return sender.sendBatch(messages); + return this._sender.createBatch(options); + } + + async sendBatch(messageBatch: ServiceBusMessageBatch): Promise { + this._throwIfSenderOrConnectionClosed(); + throwTypeErrorIfParameterMissing( + this._context.namespace.connectionId, + "messageBatch", + messageBatch + ); + return this._sender.sendBatch(messageBatch); } /** diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 27b190051363..44c8cba92c1d 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -301,7 +301,7 @@ export function getMessagePropertyTypeMismatchError(msg: ServiceBusMessage): Err /** * @internal - * Converts given SendableMessageInfo to AmqpMessage + * Converts given ServiceBusMessage to AmqpMessage */ export function toAmqpMessage(msg: ServiceBusMessage): AmqpMessage { const amqpMsg: AmqpMessage = { @@ -1043,7 +1043,7 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { * @returns ServiceBusMessage */ clone(): ServiceBusMessage { - // We are returning a SendableMessageInfo object because that object can then be sent to Service Bus + // We are returning a ServiceBusMessage object because that object can then be sent to Service Bus const clone: ServiceBusMessage = { body: this.body, contentType: this.contentType, diff --git a/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts new file mode 100644 index 000000000000..24d53d4b6806 --- /dev/null +++ b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts @@ -0,0 +1,179 @@ +import { ServiceBusMessage, toAmqpMessage } from "./serviceBusMessage"; +import { throwTypeErrorIfParameterMissing } from "./util/errors"; +import { ClientEntityContext } from "./clientEntityContext"; +import { message as RheaMessageUtil, messageProperties } from "rhea-promise"; +import { AmqpMessage } from "@azure/core-amqp"; + +/** + * A batch of messages that you can create using the {@link createBatch} method. + * + * @export + * @interface ServiceBusMessageBatch + */ +export interface ServiceBusMessageBatch { + /** + * Size of the batch in bytes after the events added to it have been encoded into a single AMQP + * message. + * @readonly + */ + readonly sizeInBytes: number; + + /** + * Number of messages added to the batch. + * @readonly + */ + readonly count: number; + + /** + * The maximum size of the batch, in bytes. The `tryAdd` function on the batch will return `false` + * if the message being added causes the size of the batch to exceed this limit. Use the `createBatch()` method on + * the `Sender` to set the maxSizeInBytes. + * @readonly. + */ + readonly maxSizeInBytes: number; + + /** + * Adds a message to the batch if permitted by the batch's size limit. + * **NOTE**: Always remember to check the return value of this method, before calling it again + * for the next event. + * + * @param message An individual service bus message. + * @returns A boolean value indicating if the message has been added to the batch or not. + */ + tryAdd(message: ServiceBusMessage): boolean; + + /** + * The AMQP message containing encoded events that were added to the batch. + * Used internally by the `sendBatch()` method on the `Sender`. + * This is not meant for the user to use directly. + * + * @readonly + * @internal + * @ignore + */ + readonly _message: Buffer | undefined; +} + +/** + * An internal class representing a batch of messages which can be used to send messages to Service Bus. + * + * @class + * @internal + * @ignore + */ +export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { + /** + * @property Describes the amqp connection context for the Client. + */ + private _context: ClientEntityContext; + /** + * @property The maximum size allowed for the batch. + */ + private _maxSizeInBytes: number; + /** + * @property Current size of the batch in bytes. + */ + private _sizeInBytes: number; + /** + * @property Encoded amqp messages. + */ + private _encodedMessages: Buffer[] = []; + /** + * @property Encoded batch message. + */ + private _batchMessage: Buffer | undefined; + + /** + * ServiceBusMessageBatch should not be constructed using `new ServiceBusMessageBatch()` + * Use the `createBatch()` method on your `Sender` instead. + * @constructor + * @internal + * @ignore + */ + constructor(context: ClientEntityContext, maxSizeInBytes: number) { + this._context = context; + this._maxSizeInBytes = maxSizeInBytes; + this._sizeInBytes = 0; + } + + /** + * @property The maximum size of the batch, in bytes. + * @readonly + */ + get maxSizeInBytes(): number { + return this._maxSizeInBytes; + } + + /** + * @property Size of the `ServiceBusMessageBatch` instance after the messages added to it have been + * encoded into a single AMQP message. + * @readonly + */ + get sizeInBytes(): number { + return this._sizeInBytes; + } + + /** + * @property Number of messages in the `ServiceBusMessageBatch` instance. + * @readonly + */ + get count(): number { + return this._encodedMessages.length; + } + + /** + * @property Represents the single AMQP message which is the result of encoding all the events + * added into the `ServiceBusMessageBatch` instance. + * + * This is not meant for the user to use directly. + * + * When the `ServiceBusMessageBatch` instance is passed to the `sendBatch()` method on the `Sender`, + * this single batched AMQP message is what gets sent over the wire to the service. + * @readonly + */ + get _message(): Buffer | undefined { + return this._batchMessage; + } + + /** + * Tries to add a message to the batch if permitted by the batch's size limit. + * **NOTE**: Always remember to check the return value of this method, before calling it again + * for the next message. + * + * @param message An individual service bus message. + * @returns A boolean value indicating if the message has been added to the batch or not. + */ + public tryAdd(message: ServiceBusMessage): boolean { + throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "tryAdd", "message"); + + // Convert ServiceBusMessage to AmqpMessage. + const amqpMessage = toAmqpMessage(message); + amqpMessage.body = this._context.namespace.dataTransformer.encode(message.body); + + // Encode every amqp message and then convert every encoded message to amqp data section + this._encodedMessages.push(RheaMessageUtil.encode(amqpMessage)); + + const batchMessage: AmqpMessage = { + body: RheaMessageUtil.data_sections(this._encodedMessages) + }; + + batchMessage.message_annotations = amqpMessage.message_annotations; + batchMessage.application_properties = amqpMessage.application_properties; + + for (const prop of messageProperties) { + (batchMessage as any)[prop] = (amqpMessage as any)[prop]; + } + + const encodedBatchMessage = RheaMessageUtil.encode(batchMessage); + const currentSize = encodedBatchMessage.length; + + // this._batchMessage will be used for final send operation + if (currentSize > this._maxSizeInBytes) { + this._encodedMessages.pop(); + return false; + } + this._batchMessage = encodedBatchMessage; + this._sizeInBytes = currentSize; + return true; + } +} diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 6de1d82661c6..fb98efd14754 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -795,7 +795,11 @@ describe("batchReceiver", () => { // See https://github.com/Azure/azure-service-bus-node/issues/31 async function testSequentialReceiveBatchCalls(useSessions?: boolean): Promise { const testMessages = useSessions ? messageWithSessions : messages; - await senderClient.sendBatch(testMessages); + const batchMessageToSend = await senderClient.createBatch(); + for (const message of testMessages) { + batchMessageToSend.tryAdd(message); + } + await senderClient.sendBatch(batchMessageToSend); const msgs1 = await receiverClient.receiveBatch(1); const msgs2 = await receiverClient.receiveBatch(1); diff --git a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts index 4b6e4d178e77..59a915b51630 100644 --- a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts +++ b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts @@ -817,7 +817,7 @@ describe("invalid parameters", () => { should.equal(caughtError && caughtError.message, `Missing parameter "message"`); }); - it("Sendbatch: Missing messages in Sender", async function(): Promise { + it("Sendbatch: Missing messageBatch in Sender", async function(): Promise { let caughtError: Error | undefined; try { await sender.sendBatch(undefined as any); @@ -825,7 +825,7 @@ describe("invalid parameters", () => { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "messages"`); + should.equal(caughtError && caughtError.message, `Missing parameter "messageBatch"`); }); it("ScheduledMessage: Missing date in Sender", async function(): Promise { diff --git a/sdk/servicebus/service-bus/test/sendSchedule.spec.ts b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts similarity index 83% rename from sdk/servicebus/service-bus/test/sendSchedule.spec.ts rename to sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts index b1583da2490d..5424e616657c 100644 --- a/sdk/servicebus/service-bus/test/sendSchedule.spec.ts +++ b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts @@ -105,79 +105,80 @@ describe("send scheduled messages", () => { }); }); - describe("Simple Send Batch", function(): void { - afterEach(async () => { - await afterEachTest(); - }); - - async function testSimpleSendBatch( - useSessions: boolean, - usePartitions: boolean - ): Promise { - const testMessages = []; - testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); - testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); - - await senderClient.sendBatch(testMessages); - const msgs = await receiverClient.receiveBatch(2); - - should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); - should.equal(msgs.length, 2, "Unexpected number of messages"); - - if (testMessages[0].messageId === msgs[0].messageId) { - TestMessage.checkMessageContents(testMessages[0], msgs[0], useSessions, usePartitions); - TestMessage.checkMessageContents(testMessages[1], msgs[1], useSessions, usePartitions); - } else { - TestMessage.checkMessageContents(testMessages[1], msgs[0], useSessions, usePartitions); - TestMessage.checkMessageContents(testMessages[0], msgs[1], useSessions, usePartitions); - } - - await msgs[0].complete(); - await msgs[1].complete(); - - await testPeekMsgsLength(receiverClient, 0); - } - - it("Partitioned Queue: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedQueue); - await testSimpleSendBatch(false, true); - }); - - it("Partitioned Topic: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedSubscription); - await testSimpleSendBatch(false, true); - }); - - it("Unpartitioned Queue: Simple SendBatch #RunInBrowser", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedQueue); - await testSimpleSendBatch(false, false); - }); - - it("Unpartitioned Topic: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedSubscription); - await testSimpleSendBatch(false, false); - }); - - it("Partitioned Queue with Sessions: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedQueueWithSessions); - await testSimpleSendBatch(true, true); - }); - - it("Partitioned Topic with Sessions: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); - await testSimpleSendBatch(true, true); - }); - - it("Unpartitioned Queue with Sessions: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); - await testSimpleSendBatch(true, false); - }); - - it("Unpartitioned Topic with Sessions: Simple SendBatch", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); - await testSimpleSendBatch(true, false); - }); - }); + // sendBatch() - Commented + // describe("Simple Send Batch", function(): void { + // afterEach(async () => { + // await afterEachTest(); + // }); + + // async function testSimpleSendBatch( + // useSessions: boolean, + // usePartitions: boolean + // ): Promise { + // const testMessages = []; + // testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); + // testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); + + // await senderClient.sendBatch(testMessages); + // const msgs = await receiverClient.receiveBatch(2); + + // should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); + // should.equal(msgs.length, 2, "Unexpected number of messages"); + + // if (testMessages[0].messageId === msgs[0].messageId) { + // TestMessage.checkMessageContents(testMessages[0], msgs[0], useSessions, usePartitions); + // TestMessage.checkMessageContents(testMessages[1], msgs[1], useSessions, usePartitions); + // } else { + // TestMessage.checkMessageContents(testMessages[1], msgs[0], useSessions, usePartitions); + // TestMessage.checkMessageContents(testMessages[0], msgs[1], useSessions, usePartitions); + // } + + // await msgs[0].complete(); + // await msgs[1].complete(); + + // await testPeekMsgsLength(receiverClient, 0); + // } + + // it("Partitioned Queue: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.PartitionedQueue); + // await testSimpleSendBatch(false, true); + // }); + + // it("Partitioned Topic: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.PartitionedSubscription); + // await testSimpleSendBatch(false, true); + // }); + + // it("Unpartitioned Queue: Simple SendBatch #RunInBrowser", async function(): Promise { + // await beforeEachTest(TestClientType.UnpartitionedQueue); + // await testSimpleSendBatch(false, false); + // }); + + // it("Unpartitioned Topic: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.UnpartitionedSubscription); + // await testSimpleSendBatch(false, false); + // }); + + // it("Partitioned Queue with Sessions: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + // await testSimpleSendBatch(true, true); + // }); + + // it("Partitioned Topic with Sessions: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + // await testSimpleSendBatch(true, true); + // }); + + // it("Unpartitioned Queue with Sessions: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + // await testSimpleSendBatch(true, false); + // }); + + // it("Unpartitioned Topic with Sessions: Simple SendBatch", async function(): Promise { + // await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + // await testSimpleSendBatch(true, false); + // }); + // }); describe("Schedule single message", function(): void { afterEach(async () => { @@ -636,35 +637,36 @@ describe("send scheduled messages", () => { ); }); - it( - "SendBatch() throws if in the first message, " + testInput.title, - async function(): Promise { - let actualErrorMsg = ""; - await senderClient.sendBatch([testInput.message, { body: "random" }]).catch((err) => { - actualErrorMsg = err.message; - }); - should.equal( - actualErrorMsg, - testInput.expectedErrorMessage, - "Error not thrown as expected" - ); - } - ); - - it( - "SendBatch() throws if in the subsequent message, " + testInput.title, - async function(): Promise { - let actualErrorMsg = ""; - await senderClient.sendBatch([{ body: "random" }, testInput.message]).catch((err) => { - actualErrorMsg = err.message; - }); - should.equal( - actualErrorMsg, - testInput.expectedErrorMessage, - "Error not thrown as expected" - ); - } - ); + // sendBatch() - Commented + // it( + // "SendBatch() throws if in the first message, " + testInput.title, + // async function(): Promise { + // let actualErrorMsg = ""; + // await senderClient.sendBatch([testInput.message, { body: "random" }]).catch((err) => { + // actualErrorMsg = err.message; + // }); + // should.equal( + // actualErrorMsg, + // testInput.expectedErrorMessage, + // "Error not thrown as expected" + // ); + // } + // ); + + // it( + // "SendBatch() throws if in the subsequent message, " + testInput.title, + // async function(): Promise { + // let actualErrorMsg = ""; + // await senderClient.sendBatch([{ body: "random" }, testInput.message]).catch((err) => { + // actualErrorMsg = err.message; + // }); + // should.equal( + // actualErrorMsg, + // testInput.expectedErrorMessage, + // "Error not thrown as expected" + // ); + // } + // ); it("ScheduleMessage() throws if " + testInput.title, async function(): Promise { let actualErrorMsg = ""; diff --git a/sdk/servicebus/service-bus/test/sendBatch.spec.ts b/sdk/servicebus/service-bus/test/sendBatch.spec.ts new file mode 100644 index 000000000000..94e68cf4aa6e --- /dev/null +++ b/sdk/servicebus/service-bus/test/sendBatch.spec.ts @@ -0,0 +1,642 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import chai from "chai"; +const should = chai.should(); +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +import { ServiceBusMessage } from "../src"; +import { TestClientType } from "./utils/testUtils"; +import { + ServiceBusClientForTests, + createServiceBusClientForTests, + EntityName +} from "./utils/testutils2"; +import { Sender } from "../src/sender"; + +describe("Send Batch", () => { + let senderClient: Sender; + let serviceBusClient: ServiceBusClientForTests; + + let entityNames: EntityName; + + before(() => { + serviceBusClient = createServiceBusClientForTests(); + }); + + after(() => { + return serviceBusClient.test.after(); + }); + + async function beforeEachTest(entityType: TestClientType): Promise { + entityNames = await serviceBusClient.test.createTestEntities(entityType); + + senderClient = serviceBusClient.test.addToCleanup( + serviceBusClient.getSender(entityNames.queue ?? entityNames.topic!) + ); + } + + async function afterEachTest(): Promise { + await senderClient.close(); + } + + describe("Send multiple homogeneous messages - size > max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + for (let i = 0; i < 1000; i++) { + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message ${i}`, + sessionId: useSessions ? `someSession` : undefined + }); + } + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const sentMessages: ServiceBusMessage[] = []; + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + for (const messageToSend of messagesToSend) { + const batchHasCapacity = batchMessage.tryAdd(messageToSend); + if (!batchHasCapacity) { + break; + } else { + sentMessages.push(messageToSend); + } + } + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages( + entityNames, + useSessions, + sentMessages + ); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(false); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(false); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(false); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(false); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + }); + + describe("Send multiple homogeneous messages - Multiple Sessions - size > max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + for (let i = 0; i < 1000; i++) { + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message ${i}`, + sessionId: useSessions ? `someSession ${i}` : undefined + }); + } + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const sentMessages: ServiceBusMessage[] = []; + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + for (const messageToSend of messagesToSend) { + const batchHasCapacity = batchMessage.tryAdd(messageToSend); + if (!batchHasCapacity) { + break; + } else { + sentMessages.push(messageToSend); + } + } + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages( + entityNames, + useSessions, + sentMessages + ); + } + + // Not allowed for partitioned entities + /* + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + */ + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + }); + + describe("Send multiple homogeneous messages - size < max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message-1`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message-2`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message-3`, + sessionId: useSessions ? `someSession` : undefined + }); + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const sentMessages: ServiceBusMessage[] = []; + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + for (const messageToSend of messagesToSend) { + const batchHasCapacity = batchMessage.tryAdd(messageToSend); + if (!batchHasCapacity) { + break; + } else { + sentMessages.push(messageToSend); + } + } + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages( + entityNames, + useSessions, + sentMessages + ); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(false); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(false); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(false); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(false); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + }); + + describe("Send single message - size < max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + messagesToSend.push({ + body: Buffer.alloc(20000), + messageId: `random-message-id`, + sessionId: useSessions ? `someSession` : undefined + }); + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const sentMessages: ServiceBusMessage[] = []; + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + for (const messageToSend of messagesToSend) { + const batchHasCapacity = batchMessage.tryAdd(messageToSend); + if (!batchHasCapacity) { + break; + } else { + sentMessages.push(messageToSend); + } + } + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages( + entityNames, + useSessions, + sentMessages + ); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(false); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(false); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(false); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(false); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + }); + + describe("Send multiple heterogenous messages - size > max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message-1`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(200000), + messageId: `message-2`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(40000), + messageId: `message-2`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(20000), + messageId: `message-3`, + sessionId: useSessions ? `someSession` : undefined + }); + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const sentMessages: ServiceBusMessage[] = []; + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + for (const messageToSend of messagesToSend) { + const batchHasCapacity = batchMessage.tryAdd(messageToSend); + if (!batchHasCapacity) { + break; + } else { + sentMessages.push(messageToSend); + } + } + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages( + entityNames, + useSessions, + sentMessages + ); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(false); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(false); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(false); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(false); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true); + }); + }); + + describe("CreateBatch - parameter maxSizeInBytes > max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + function prepareMessages(useSessions: boolean): ServiceBusMessage[] { + const messagesToSend: ServiceBusMessage[] = []; + messagesToSend.push({ + body: Buffer.alloc(2000), + messageId: `message-1`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(200000), + messageId: `message-2`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(40000), + messageId: `message-3`, + sessionId: useSessions ? `someSession` : undefined + }); + messagesToSend.push({ + body: Buffer.alloc(20000), + messageId: `message-4`, + sessionId: useSessions ? `someSession` : undefined + }); + return messagesToSend; + } + + async function testSendBatch( + useSessions: boolean, + // Max batch size + maxSizeInBytes?: number + ): Promise { + // Prepare messages to send + const messagesToSend = prepareMessages(useSessions); + const batchMessage = await senderClient.createBatch({ maxSizeInBytes }); + + should.equal( + batchMessage.tryAdd(messagesToSend[0]), + true, + "tryAdd should not have failed for the first message" + ); + should.equal( + batchMessage.tryAdd(messagesToSend[1]), + false, + "tryAdd should have failed for the second message" + ); + should.equal( + batchMessage.tryAdd(messagesToSend[2]), + false, + "tryAdd should have failed for the third message" + ); + should.equal( + batchMessage.tryAdd(messagesToSend[3]), + false, + "tryAdd should have failed for the fourth message" + ); + await senderClient.sendBatch(batchMessage); + // receive all the messages in receive and delete mode + await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, useSessions, [ + messagesToSend[0] + ]); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(false, 5000); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(false, 5000); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(false, 5000); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(false, 5000); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(true, 5000); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(true, 5000); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(true, 5000); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(true, 5000); + }); + }); + + describe("CreateBatch should throw error - parameter maxSizeInBytes > max_batch_size_allowed", function(): void { + afterEach(async () => { + await afterEachTest(); + }); + const maxSizeInBytes = 30000000; + + async function testSendBatch(maxSizeInBytes?: number): Promise { + let errorIsThrown = false; + try { + await senderClient.createBatch({ maxSizeInBytes }); + } catch (error) { + should.equal( + error.message, + `Max message size (${maxSizeInBytes} bytes) is greater than maximum message size (262144 bytes) on the AMQP sender link.`, + "Unexpected error message when tried to create a batch of size > maximum message size." + ); + errorIsThrown = true; + } + should.equal( + errorIsThrown, + true, + "Error is not thrown when tried to create a batch of size > maximum message size." + ); + } + + it("Partitioned Queue: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueue); + await testSendBatch(maxSizeInBytes); + }); + + it("Partitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscription); + await testSendBatch(maxSizeInBytes); + }); + + it("Unpartitioned Queue: SendBatch #RunInBrowser", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueue); + await testSendBatch(maxSizeInBytes); + }); + + it("Unpartitioned Topic: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscription); + await testSendBatch(maxSizeInBytes); + }); + + it("Partitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedQueueWithSessions); + await testSendBatch(maxSizeInBytes); + }); + + it("Partitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.PartitionedSubscriptionWithSessions); + await testSendBatch(maxSizeInBytes); + }); + + it("Unpartitioned Queue with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); + await testSendBatch(maxSizeInBytes); + }); + + it("Unpartitioned Topic with Sessions: SendBatch", async function(): Promise { + await beforeEachTest(TestClientType.UnpartitionedSubscriptionWithSessions); + await testSendBatch(maxSizeInBytes); + }); + }); +}); diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index ce3752afdd1a..c57cf020c026 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -948,7 +948,11 @@ describe("Streaming", () => { async function testConcurrency(maxConcurrentCalls?: number): Promise { const testMessages = [TestMessage.getSample(), TestMessage.getSample()]; - await senderClient.sendBatch(testMessages); + const batchMessageToSend = await senderClient.createBatch(); + testMessages.forEach((message) => { + batchMessageToSend.tryAdd(message); + }); + await senderClient.sendBatch(batchMessageToSend); const settledMsgs: ReceivedMessage[] = []; const receivedMsgs: ReceivedMessage[] = []; diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index b3268938a0a5..e8a206bedbda 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -844,7 +844,11 @@ describe("Streaming with sessions", () => { } const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()]; - await senderClient.sendBatch(testMessages); + const batchMessageToSend = await senderClient.createBatch(); + for (const message of testMessages) { + batchMessageToSend.tryAdd(message); + } + await senderClient.sendBatch(batchMessageToSend); const settledMsgs: ReceivedMessageWithLock[] = []; const receivedMsgs: ReceivedMessageWithLock[] = []; @@ -873,7 +877,11 @@ describe("Streaming with sessions", () => { }, processError }, - maxConcurrentCalls ? { maxConcurrentCalls } : {} + maxConcurrentCalls + ? { + maxConcurrentCalls + } + : {} ); await checkWithTimeout(() => settledMsgs.length === 2); diff --git a/sdk/servicebus/service-bus/test/utils/testutils2.ts b/sdk/servicebus/service-bus/test/utils/testutils2.ts index b3215a4e1024..2d91b81ae2b8 100644 --- a/sdk/servicebus/service-bus/test/utils/testutils2.ts +++ b/sdk/servicebus/service-bus/test/utils/testutils2.ts @@ -17,7 +17,11 @@ import * as dotenv from "dotenv"; import { recreateQueue, recreateTopic, recreateSubscription } from "./managementUtils"; import { ServiceBusClientOptions } from "../../src"; import chai from "chai"; -import { ReceivedMessageWithLock, ReceivedMessage } from "../../src/serviceBusMessage"; +import { + ReceivedMessageWithLock, + ReceivedMessage, + ServiceBusMessage +} from "../../src/serviceBusMessage"; dotenv.config(); const env = getEnvVars(); @@ -124,6 +128,8 @@ export async function drainAllMessages(receiver: Receiver<{}>): Promise { await receiver.close(); } +export type EntityName = Omit, "isPartitioned" | "usesSessions">; + export interface ServiceBusClientForTests extends ServiceBusClient { test: ServiceBusTestHelpers; } @@ -142,6 +148,72 @@ export class ServiceBusTestHelpers { await Promise.all(closePromises); } + async verifyAndDeleteAllSentMessages( + entityNames: EntityName, + useSessions: boolean, + sentMessages: ServiceBusMessage[] + ): Promise { + let receiverClient: Receiver | SessionReceiver; + let receivedMsgs: ReceivedMessage[]; + if (!useSessions) { + receiverClient = this.getReceiveAndDeleteReceiver({ + queue: entityNames.queue, + topic: entityNames.topic, + subscription: entityNames.subscription, + usesSessions: false + }); + receivedMsgs = await receiverClient.receiveBatch(sentMessages.length, { + // To Do - Maybe change the maxWaitTime + // Currently set same as numberOfMessages being received + maxWaitTimeSeconds: sentMessages.length + }); + await receiverClient.close(); + } else { + // From the sentMessages array, creating a set of all the `session-id`s + const setOfSessionIds: Set = new Set(); + // numOfMsgsWithSessionId - To keep track of number of messages sent per session in the sent messages + let numOfMsgsWithSessionId: { [sessionId: string]: number } = {}; + sentMessages.forEach((msg) => { + setOfSessionIds.add(msg.sessionId!); + numOfMsgsWithSessionId[msg.sessionId!] = numOfMsgsWithSessionId[msg.sessionId!] + ? numOfMsgsWithSessionId[msg.sessionId!] + 1 + : 1; + }); + // for-loop to receive messages from those `session-id`s + for (const id of setOfSessionIds) { + receiverClient = this.getReceiveAndDeleteReceiver({ + queue: entityNames.queue, + topic: entityNames.topic, + subscription: entityNames.subscription, + usesSessions: true, + sessionId: id + }); + const msgs = await receiverClient.receiveBatch(numOfMsgsWithSessionId[id], { + // To Do - Maybe change the maxWaitTime + // Currently set same as numberOfMessages being received + maxWaitTimeSeconds: numOfMsgsWithSessionId[id] + }); + receivedMsgs = !receivedMsgs! ? msgs : receivedMsgs!.concat(msgs); + await receiverClient.close(); + } + } + should.equal( + sentMessages.length, + receivedMsgs!.length, + "Unexpected number of messages received." + ); + receivedMsgs!.forEach((receivedMessage) => { + sentMessages = sentMessages.filter( + (sentMessage) => + sentMessage.messageId !== receivedMessage.messageId && + sentMessage.body !== receivedMessage.body + // To Do - Can check more properties here other than just messageId and body + ); + }); + should.equal(sentMessages.length, 0, "Unexpected messages received."); + // To Do - Maybe peek into the entity to make sure there are no messages left in the entity + } + async after(): Promise { // TODO: purge any of the dynamically created entities created in `createTestEntities` await this._serviceBusClient.close(); @@ -252,16 +324,19 @@ export class ServiceBusTestHelpers { /** * Gets a receiveAndDelete receiver for the specified `TestClientType` - * NOTE: the underlying receiver may be a `SessionReceiverImpl` + * NOTE: the underlying receiver may be a `SessionReceiverImpl`. + * For sessions, if the sessionId is not provided, SessionReceiver returned from this method is meant only for the default sessionId: `TestMessage.sessionId` * * The receiver created by this method will be cleaned up by `afterEach()` */ getReceiveAndDeleteReceiver( - entityNames: ReturnType + entityNames: Omit, "isPartitioned"> & { + sessionId?: string | undefined; + } ): Receiver { // TODO: we should generate a random ID here - there's no harm in // creating as many sessions as we wish. Some tests will need to change. - const sessionId = TestMessage.sessionId; + const sessionId = entityNames.sessionId ?? TestMessage.sessionId; if (entityNames.usesSessions) { return this.addToCleanup(