diff --git a/sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts b/sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts index b435f4d87a8f..58ed07de0dd2 100644 --- a/sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts +++ b/sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts @@ -19,14 +19,19 @@ interface ScenarioReceiveBatchOptions { numberOfMessagesPerSend?: number; delayBetweenSendsInMs?: number; totalNumberOfMessagesToSend?: number; + /** + * If set to true, `totalNumberOfMessagesToSend` number of messages will be sent before triggering receive. + */ + sendAllMessagesBeforeReceiveStarts?: boolean; + numberOfParallelSends?: number; maxAutoLockRenewalDurationInMs?: number; settleMessageOnReceive: boolean; } function sanitizeOptions(args: string[]): Required { const options = parsedArgs(args, { - boolean: ["settleMessageOnReceive"], - default: { settleMessageOnReceive: false } + boolean: ["settleMessageOnReceive", "sendAllMessagesBeforeReceiveStarts"], + default: { settleMessageOnReceive: false, sendAllMessagesBeforeReceiveStarts: false } }); return { testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes @@ -37,14 +42,16 @@ function sanitizeOptions(args: string[]): Required numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1, delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0, totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity, + sendAllMessagesBeforeReceiveStarts: options.sendAllMessagesBeforeReceiveStarts, maxAutoLockRenewalDurationInMs: options.maxAutoLockRenewalDurationInMs || 0, // 0 = disabled - settleMessageOnReceive: options.settleMessageOnReceive + settleMessageOnReceive: options.settleMessageOnReceive, + numberOfParallelSends: options.numberOfParallelSends || 5 }; } export async function scenarioReceiveBatch() { const testOptions = sanitizeOptions(process.argv); - const { + let { testDurationInMs, receiveMode, receiveBatchMaxMessageCount, @@ -54,7 +61,9 @@ export async function scenarioReceiveBatch() { delayBetweenSendsInMs, totalNumberOfMessagesToSend, maxAutoLockRenewalDurationInMs, - settleMessageOnReceive + settleMessageOnReceive, + sendAllMessagesBeforeReceiveStarts, + numberOfParallelSends } = testOptions; // Sending stops after 70% of total duration to give the receiver a chance to clean up and receive all the messages @@ -86,7 +95,10 @@ export async function scenarioReceiveBatch() { elapsedTime < testDurationForSendInMs && stressBase.messagesSent.length < totalNumberOfMessagesToSend ) { - await stressBase.sendMessages([sender], numberOfMessagesPerSend); + await stressBase.sendMessages( + new Array(numberOfParallelSends).fill(sender), + numberOfMessagesPerSend + ); elapsedTime = new Date().valueOf() - startedAt.valueOf(); await delay(delayBetweenSendsInMs); } @@ -106,7 +118,12 @@ export async function scenarioReceiveBatch() { } } - await Promise.all([sendMessages(), receiveMessages()]); + if (sendAllMessagesBeforeReceiveStarts) { + await sendMessages(); + } + await Promise.all( + (!sendAllMessagesBeforeReceiveStarts ? [sendMessages()] : []).concat(receiveMessages()) + ); await sbClient.close(); await stressBase.end(); diff --git a/sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts b/sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts index 1ad9b1c91201..f6bb9a1bfd27 100644 --- a/sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts +++ b/sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts @@ -25,7 +25,7 @@ function sanitizeOptions(args: string[]): Required { default: { shouldCreateNewClientEachTime: true } }); return { - testDurationInMs: options.testDurationInMs || 60 * 1000, // Default = 60 minutes + testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10, receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000, numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1, diff --git a/sdk/servicebus/service-bus/test/stress/stressTestsBase.ts b/sdk/servicebus/service-bus/test/stress/stressTestsBase.ts index a1bc8f586901..7db914c6c909 100644 --- a/sdk/servicebus/service-bus/test/stress/stressTestsBase.ts +++ b/sdk/servicebus/service-bus/test/stress/stressTestsBase.ts @@ -80,13 +80,13 @@ export class SBStressTestsBase { options?: CreateQueueOptions | undefined, testOptions?: Record ) { + this.queueName = + (!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`; this.reportFileName = `temp/report-${this.queueName}.txt`; this.errorsFileName = `temp/errors-${this.queueName}.txt`; this.messagesReportFileName = `temp/messages-${this.queueName}.json`; if (testOptions) console.log(testOptions); await appendFile(this.reportFileName, JSON.stringify(testOptions, null, 2)); - this.queueName = - (!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`; await this.serviceBusAdministrationClient.createQueue(this.queueName, options); } diff --git a/sdk/servicebus/service-bus/test/stress/utils.ts b/sdk/servicebus/service-bus/test/stress/utils.ts index 8db78077e9da..1f543d08bfe8 100644 --- a/sdk/servicebus/service-bus/test/stress/utils.ts +++ b/sdk/servicebus/service-bus/test/stress/utils.ts @@ -67,7 +67,9 @@ export async function saveDiscrepanciesFromTrackedMessages( const output = { messages_sent_but_never_received: [], messages_not_sent_but_received: [], - messages_sent_multiple_times: [] + messages_sent_multiple_times: [], + messages_sent_once_but_received_multiple_times: [], + messages_sent_once_and_received_once: [] }; for (const id in trackedMessageIds) { if (trackedMessageIds[id].sentCount <= 0) { @@ -82,6 +84,14 @@ export async function saveDiscrepanciesFromTrackedMessages( // Message was sent multiple times output.messages_sent_multiple_times.push(id); } + if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount > 1) { + // Message was sent once but received multiple times + output.messages_sent_once_but_received_multiple_times.push(id); + } + if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount === 1) { + // Message was sent once and received once + output.messages_sent_once_and_received_once.push(id); + } } await writeFile(fileName, JSON.stringify(output));