Skip to content

Commit

Permalink
[Service Bus] Minor updates to stress tests (#12561)
Browse files Browse the repository at this point in the history
This PR adds 
- a new option `sendAllMessagesBeforeReceiveStarts` which would be useful if we want to receive a ton of messages in the queue.
- tracking more info related to the messages
- other minor fixes
  • Loading branch information
HarshaNalluru authored Jan 21, 2021
1 parent 99135cc commit 28a534f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
31 changes: 24 additions & 7 deletions sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ interface ScenarioReceiveBatchOptions {
numberOfMessagesPerSend?: number;
delayBetweenSendsInMs?: number;
totalNumberOfMessagesToSend?: number;
/**
* If set to true, `totalNumberOfMessagesToSend` number of messages will be sent before triggering receive.
*/
sendAllMessagesBeforeReceiveStarts?: boolean;
numberOfParallelSends?: number;
maxAutoLockRenewalDurationInMs?: number;
settleMessageOnReceive: boolean;
}

function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions> {
const options = parsedArgs<ScenarioReceiveBatchOptions>(args, {
boolean: ["settleMessageOnReceive"],
default: { settleMessageOnReceive: false }
boolean: ["settleMessageOnReceive", "sendAllMessagesBeforeReceiveStarts"],
default: { settleMessageOnReceive: false, sendAllMessagesBeforeReceiveStarts: false }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
Expand All @@ -37,14 +42,16 @@ function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions>
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
sendAllMessagesBeforeReceiveStarts: options.sendAllMessagesBeforeReceiveStarts,
maxAutoLockRenewalDurationInMs: options.maxAutoLockRenewalDurationInMs || 0, // 0 = disabled
settleMessageOnReceive: options.settleMessageOnReceive
settleMessageOnReceive: options.settleMessageOnReceive,
numberOfParallelSends: options.numberOfParallelSends || 5
};
}

export async function scenarioReceiveBatch() {
const testOptions = sanitizeOptions(process.argv);
const {
let {
testDurationInMs,
receiveMode,
receiveBatchMaxMessageCount,
Expand All @@ -54,7 +61,9 @@ export async function scenarioReceiveBatch() {
delayBetweenSendsInMs,
totalNumberOfMessagesToSend,
maxAutoLockRenewalDurationInMs,
settleMessageOnReceive
settleMessageOnReceive,
sendAllMessagesBeforeReceiveStarts,
numberOfParallelSends
} = testOptions;

// Sending stops after 70% of total duration to give the receiver a chance to clean up and receive all the messages
Expand Down Expand Up @@ -86,7 +95,10 @@ export async function scenarioReceiveBatch() {
elapsedTime < testDurationForSendInMs &&
stressBase.messagesSent.length < totalNumberOfMessagesToSend
) {
await stressBase.sendMessages([sender], numberOfMessagesPerSend);
await stressBase.sendMessages(
new Array(numberOfParallelSends).fill(sender),
numberOfMessagesPerSend
);
elapsedTime = new Date().valueOf() - startedAt.valueOf();
await delay(delayBetweenSendsInMs);
}
Expand All @@ -106,7 +118,12 @@ export async function scenarioReceiveBatch() {
}
}

await Promise.all([sendMessages(), receiveMessages()]);
if (sendAllMessagesBeforeReceiveStarts) {
await sendMessages();
}
await Promise.all(
(!sendAllMessagesBeforeReceiveStarts ? [sendMessages()] : []).concat(receiveMessages())
);
await sbClient.close();

await stressBase.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function sanitizeOptions(args: string[]): Required<ScenarioCloseOptions> {
default: { shouldCreateNewClientEachTime: true }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 1000, // Default = 60 minutes
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000,
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/test/stress/stressTestsBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ export class SBStressTestsBase {
options?: CreateQueueOptions | undefined,
testOptions?: Record<string, string | number | boolean>
) {
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
this.reportFileName = `temp/report-${this.queueName}.txt`;
this.errorsFileName = `temp/errors-${this.queueName}.txt`;
this.messagesReportFileName = `temp/messages-${this.queueName}.json`;
if (testOptions) console.log(testOptions);
await appendFile(this.reportFileName, JSON.stringify(testOptions, null, 2));
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
await this.serviceBusAdministrationClient.createQueue(this.queueName, options);
}

Expand Down
12 changes: 11 additions & 1 deletion sdk/servicebus/service-bus/test/stress/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ export async function saveDiscrepanciesFromTrackedMessages(
const output = {
messages_sent_but_never_received: [],
messages_not_sent_but_received: [],
messages_sent_multiple_times: []
messages_sent_multiple_times: [],
messages_sent_once_but_received_multiple_times: [],
messages_sent_once_and_received_once: []
};
for (const id in trackedMessageIds) {
if (trackedMessageIds[id].sentCount <= 0) {
Expand All @@ -82,6 +84,14 @@ export async function saveDiscrepanciesFromTrackedMessages(
// Message was sent multiple times
output.messages_sent_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount > 1) {
// Message was sent once but received multiple times
output.messages_sent_once_but_received_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount === 1) {
// Message was sent once and received once
output.messages_sent_once_and_received_once.push(id);
}
}

await writeFile(fileName, JSON.stringify(output));
Expand Down

0 comments on commit 28a534f

Please sign in to comment.