Skip to content

Commit

Permalink
Removing three tests which had more code than needed to test that the…
Browse files Browse the repository at this point in the history
… maxConcurrentCalls was passed properly.

Replaced with a smaller test that can (with the right parameters, manually specified) still cause the session overlap bug. I don't have a solution for that one yet but this will allow us to investigate it when we're ready.

Part of the investigation for #14441
  • Loading branch information
richardpark-msft authored Jul 8, 2021
1 parent ff0c7f8 commit f5f5b54
Showing 1 changed file with 55 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import {
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors";
import { TestMessage, checkWithTimeout } from "../public/utils/testUtils";
import { DispositionType } from "../../src/serviceBusMessage";
import { ServiceBusSessionReceiver } from "../../src/receivers/sessionReceiver";
import {
ServiceBusSessionReceiver,
ServiceBusSessionReceiverImpl
} from "../../src/receivers/sessionReceiver";
import {
EntityName,
ServiceBusClientForTests,
Expand All @@ -24,7 +27,9 @@ import {
} from "../public/utils/testutils2";
import { getDeliveryProperty } from "./utils/misc";
import { singleMessagePromise } from "./streamingReceiver.spec";
import { defer } from "./unit/unittestUtils";
const should = chai.should();
const assert = chai.assert;
chai.use(chaiAsPromised);

describe("Streaming with sessions", () => {
Expand All @@ -35,9 +40,11 @@ describe("Streaming with sessions", () => {
let unexpectedError: Error | undefined;
let serviceBusClient: ServiceBusClientForTests;
const testClientType = getRandomTestClientTypeWithSessions();
let entityNames: AutoGeneratedEntity;

before(() => {
before(async () => {
serviceBusClient = createServiceBusClientForTests();
entityNames = await serviceBusClient.test.createTestEntities(testClientType);
});

after(async () => {
Expand All @@ -54,7 +61,7 @@ describe("Streaming with sessions", () => {
async function beforeEachTest(
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<EntityName> {
const entityNames = await createReceiverForTests(receiveMode);
await createReceiverForTests(receiveMode);

sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
Expand All @@ -68,47 +75,44 @@ describe("Streaming with sessions", () => {
}

async function createReceiverForTests(
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<AutoGeneratedEntity> {
const entityNames = await serviceBusClient.test.createTestEntities(testClientType);
receiveMode?: "peekLock" | "receiveAndDelete",
sessionId: string = TestMessage.sessionId
): Promise<void> {
receiver = serviceBusClient.test.addToCleanup(
receiveMode === "receiveAndDelete"
? entityNames.queue
? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId, {
? await serviceBusClient.acceptSession(entityNames.queue, sessionId, {
receiveMode: "receiveAndDelete"
})
: await serviceBusClient.acceptSession(
entityNames.topic!,
entityNames.subscription!,
TestMessage.sessionId,
sessionId,
{
receiveMode: "receiveAndDelete"
}
)
: entityNames.queue
? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId)
? await serviceBusClient.acceptSession(entityNames.queue, sessionId)
: await serviceBusClient.acceptSession(
entityNames.topic!,
entityNames.subscription!,
TestMessage.sessionId
sessionId
)
);
return entityNames;
}

it(
testClientType + ": Streaming - stop a message subscription without closing the receiver",
async () => {
const entities = await serviceBusClient.test.createTestEntities(testClientType);

const sender2 = await serviceBusClient.test.createSender(entities);
const sender2 = await serviceBusClient.test.createSender(entityNames);
await sender2.sendMessages({
body: ".close() test - first message",
sessionId: TestMessage.sessionId
});

const actualReceiver = await serviceBusClient.test.acceptSessionWithPeekLock(
entities,
entityNames,
TestMessage.sessionId
);
const { subscriber, messages } = await singleMessagePromise(actualReceiver);
Expand All @@ -135,7 +139,7 @@ describe("Streaming with sessions", () => {

await actualReceiver.close(); // release the session lock

const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

// clean out the remaining message that never arrived.
const [finalMessage] = await receiver2.receiveMessages(1, { maxWaitTimeInMs: 5000 });
Expand Down Expand Up @@ -636,84 +640,56 @@ describe("Streaming with sessions", () => {
}
);
});

describe(testClientType + ": Sessions Streaming - maxConcurrentCalls", function(): void {
afterEach(async () => {
await afterEachTest();
});

beforeEach(async () => {
await beforeEachTest();
});
const iterationLimit = 1;
const createSessionId = (maxConcurrentCalls: number | undefined, i: number) =>
`max-concurrent-calls-session-${maxConcurrentCalls}[${i}]`;

async function testConcurrency(maxConcurrentCalls?: number): Promise<void> {
if (
typeof maxConcurrentCalls === "number" &&
(maxConcurrentCalls < 1 || maxConcurrentCalls > 2)
) {
chai.assert.fail(
"Sorry, the tests here only support cases when maxConcurrentCalls is set to 1 or 2"
);
}
// to (occasionally) reproduce session overlap issue mentioned in:
// https://github.com/Azure/azure-sdk-for-js/issues/14441
// const iterationLimit = 1000;
// const createSessionId = (_maxConcurrentCalls: number | undefined, _i: number) =>
// `max-concurrent-calls-session`;

const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()];
const batchMessageToSend = await sender.createMessageBatch();
for (const message of testMessages) {
batchMessageToSend.tryAddMessage(message);
}
await sender.sendMessages(batchMessageToSend);
[undefined, 1, 2].forEach((maxConcurrentCalls) => {
for (let i = 0; i < iterationLimit; ++i) {
it(`[${i}] maxConcurrentCalls (with sessions), adding ${maxConcurrentCalls} credits`, async () => {
const sessionId = createSessionId(maxConcurrentCalls, i);
await createReceiverForTests("peekLock", sessionId);

const settledMsgs: ServiceBusReceivedMessage[] = [];
const receivedMsgs: ServiceBusReceivedMessage[] = [];
const sessionReceiver = receiver as ServiceBusSessionReceiverImpl;

receiver.subscribe(
{
async processMessage(msg: ServiceBusReceivedMessage) {
if (receivedMsgs.length === 1) {
if ((!maxConcurrentCalls || maxConcurrentCalls === 1) && settledMsgs.length === 0) {
throw new Error(
"onMessage for the second message should not have been called before the first message got settled"
);
}
} else {
if (maxConcurrentCalls === 2 && settledMsgs.length !== 0) {
throw new Error(
"onMessage for the second message should have been called before the first message got settled"
);
}
}
const { promise: addCreditPromise, resolve: resolveAddCredit } = defer<number>();

receivedMsgs.push(msg);
await delay(2000);
await receiver.completeMessage(msg);
settledMsgs.push(msg);
},
processError
},
maxConcurrentCalls
? {
maxConcurrentCalls
}
: {}
);
sessionReceiver["_messageSession"]!["_link"]!.addCredit = resolveAddCredit;

await checkWithTimeout(() => settledMsgs.length === 2);
await receiver.close();
let processErrorArgs: ProcessErrorArgs | undefined;

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(settledMsgs.length, 2, `Expected 2, received ${settledMsgs.length} messages.`);
}
sessionReceiver.subscribe(
{
async processMessage(_message: ServiceBusReceivedMessage) {},
async processError(args: ProcessErrorArgs) {
processErrorArgs = args;
}
},
{
maxConcurrentCalls
}
);

it("no maxConcurrentCalls passed(with sessions)", async function(): Promise<void> {
await testConcurrency();
});
const actualCredits = await addCreditPromise;

it("pass 1 for maxConcurrentCalls(with sessions)", async function(): Promise<void> {
await testConcurrency(1);
});
// the default value (ie: maxConcurrentCalls === undefined) is 1
const expectedCredits = maxConcurrentCalls ?? 1;

it("pass 2 for maxConcurrentCalls(with sessions)", async function(): Promise<void> {
await testConcurrency(2);
assert.equal(actualCredits, expectedCredits);
assert.isUndefined(processErrorArgs, "No error should have occurred.");
});
}
});
});

Expand Down

0 comments on commit f5f5b54

Please sign in to comment.