Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Minor updates to stress tests #12561

Merged
8 commits merged into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ interface ScenarioReceiveBatchOptions {
numberOfMessagesPerSend?: number;
delayBetweenSendsInMs?: number;
totalNumberOfMessagesToSend?: number;
/**
* If set to true, `totalNumberOfMessagesToSend` number of messages will be sent before triggering receive.
*/
sendAllMessagesBeforeReceiveStarts?: boolean;
numberOfParallelSends?: number;
maxAutoLockRenewalDurationInMs?: number;
settleMessageOnReceive: boolean;
}

function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions> {
const options = parsedArgs<ScenarioReceiveBatchOptions>(args, {
boolean: ["settleMessageOnReceive"],
default: { settleMessageOnReceive: false }
boolean: ["settleMessageOnReceive", "sendAllMessagesBeforeReceiveStarts"],
default: { settleMessageOnReceive: false, sendAllMessagesBeforeReceiveStarts: false }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
Expand All @@ -37,14 +42,16 @@ function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions>
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
sendAllMessagesBeforeReceiveStarts: options.sendAllMessagesBeforeReceiveStarts,
maxAutoLockRenewalDurationInMs: options.maxAutoLockRenewalDurationInMs || 0, // 0 = disabled
settleMessageOnReceive: options.settleMessageOnReceive
settleMessageOnReceive: options.settleMessageOnReceive,
numberOfParallelSends: options.numberOfParallelSends || 5
};
}

export async function scenarioReceiveBatch() {
const testOptions = sanitizeOptions(process.argv);
const {
let {
testDurationInMs,
receiveMode,
receiveBatchMaxMessageCount,
Expand All @@ -54,7 +61,9 @@ export async function scenarioReceiveBatch() {
delayBetweenSendsInMs,
totalNumberOfMessagesToSend,
maxAutoLockRenewalDurationInMs,
settleMessageOnReceive
settleMessageOnReceive,
sendAllMessagesBeforeReceiveStarts,
numberOfParallelSends
} = testOptions;

// Sending stops after 70% of total duration to give the receiver a chance to clean up and receive all the messages
Expand Down Expand Up @@ -86,7 +95,10 @@ export async function scenarioReceiveBatch() {
elapsedTime < testDurationForSendInMs &&
stressBase.messagesSent.length < totalNumberOfMessagesToSend
) {
await stressBase.sendMessages([sender], numberOfMessagesPerSend);
await stressBase.sendMessages(
new Array(numberOfParallelSends).fill(sender),
numberOfMessagesPerSend
);
elapsedTime = new Date().valueOf() - startedAt.valueOf();
await delay(delayBetweenSendsInMs);
}
Expand All @@ -106,7 +118,12 @@ export async function scenarioReceiveBatch() {
}
}

await Promise.all([sendMessages(), receiveMessages()]);
if (sendAllMessagesBeforeReceiveStarts) {
await sendMessages();
}
await Promise.all(
(!sendAllMessagesBeforeReceiveStarts ? [sendMessages()] : []).concat(receiveMessages())
);
await sbClient.close();

await stressBase.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function sanitizeOptions(args: string[]): Required<ScenarioCloseOptions> {
default: { shouldCreateNewClientEachTime: true }
});
return {
testDurationInMs: options.testDurationInMs || 60 * 1000, // Default = 60 minutes
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000,
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/test/stress/stressTestsBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ export class SBStressTestsBase {
options?: CreateQueueOptions | undefined,
testOptions?: Record<string, string | number | boolean>
) {
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
this.reportFileName = `temp/report-${this.queueName}.txt`;
this.errorsFileName = `temp/errors-${this.queueName}.txt`;
this.messagesReportFileName = `temp/messages-${this.queueName}.json`;
if (testOptions) console.log(testOptions);
await appendFile(this.reportFileName, JSON.stringify(testOptions, null, 2));
this.queueName =
(!queueNamePrefix ? `queue` : queueNamePrefix) + `-${Math.ceil(Math.random() * 100000)}`;
await this.serviceBusAdministrationClient.createQueue(this.queueName, options);
}

Expand Down
12 changes: 11 additions & 1 deletion sdk/servicebus/service-bus/test/stress/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ export async function saveDiscrepanciesFromTrackedMessages(
const output = {
messages_sent_but_never_received: [],
messages_not_sent_but_received: [],
messages_sent_multiple_times: []
messages_sent_multiple_times: [],
messages_sent_once_but_received_multiple_times: [],
messages_sent_once_and_received_once: []
};
for (const id in trackedMessageIds) {
if (trackedMessageIds[id].sentCount <= 0) {
Expand All @@ -82,6 +84,14 @@ export async function saveDiscrepanciesFromTrackedMessages(
// Message was sent multiple times
output.messages_sent_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount > 1) {
// Message was sent once but received multiple times
output.messages_sent_once_but_received_multiple_times.push(id);
}
if (trackedMessageIds[id].sentCount === 1 && trackedMessageIds[id].receivedCount === 1) {
// Message was sent once and received once
output.messages_sent_once_and_received_once.push(id);
}
}

await writeFile(fileName, JSON.stringify(output));
Expand Down