Skip to content

Commit

Permalink
[service-bus] Mitigate session-overlap test (#16183)
Browse files Browse the repository at this point in the history
We have an intermittent issue where we close a session and it releases, but there is a delay (Documented as part of this issue here: [link](#14441). We've seen it in our normal live tests (not every time, but often enough to be noted).

I've done two things to make this "better":
- Mitigated the issue so we can still get a decent test signal by making the tests no longer use the same session ID. They didn't actually need to send or receive messages for the test itself (it's just validating that we're adding the right amount of credits) so I simplified the tests.
- Put in some wrapper code to make it simpler to reproduce the issue in the future. It will basically run these session based tests 1000x, which still doesn't _always_ reproduce the issue but can.

As a bonus, there was a `peekMessages()` test that expected that a message that has been sent will immediately be available, which isn't always true (there can still be a delay between "message accepted" and "message receivable"). I've fixed this test by doing a `receiveMessages()` first (which will wait until messages are available), abandoning the message and then doing a peek, which should work more reliably.
  • Loading branch information
richardpark-msft authored Jul 8, 2021
1 parent d2d7000 commit 121d898
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
ServiceBusMessageImpl,
ServiceBusReceivedMessage
} from "../../src/serviceBusMessage";
import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc";
import { testLogger } from "./utils/misc";

const should = chai.should();
chai.use(chaiAsPromised);
Expand Down Expand Up @@ -346,13 +346,8 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink",
it(
noSessionTestClientType + ": deadLetter() moves message to deadletter queue",
async function(): Promise<void> {
enableCommonLoggers();
try {
await beforeEachTest(noSessionTestClientType);
await testDeadletter();
} finally {
disableCommonLoggers();
}
await beforeEachTest(noSessionTestClientType);
await testDeadletter();
}
);

Expand Down
15 changes: 8 additions & 7 deletions sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
import { LinkEntity } from "../../src/core/linkEntity";
import { Constants, StandardAbortMessage } from "@azure/core-amqp";
import { BatchingReceiver } from "../../src/core/batchingReceiver";
import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc";
import { testLogger } from "./utils/misc";

const should = chai.should();
chai.use(chaiAsPromised);
Expand Down Expand Up @@ -328,11 +328,14 @@ describe("Batching Receiver", () => {
: TestMessage.getSample();
await sender.sendMessages(testMessages);

// we need to validate that the message has arrived (peek will just return immediately
// if the queue/subscription is empty)
const [receivedMessage] = await receiver.receiveMessages(1);
assert.ok(receivedMessage);
await receiver.abandonMessage(receivedMessage); // put the message back

const [peekedMsg] = await receiver.peekMessages(1);
if (!peekedMsg) {
// Sometimes the peek call does not return any messages :(
return;
}
assert.ok(peekedMsg);

should.equal(
!peekedMsg.lockToken,
Expand Down Expand Up @@ -810,13 +813,11 @@ describe("Batching Receiver", () => {
describe("Batch Receiver - disconnects", () => {
describe("Batch Receiver - disconnects (non-session)", function(): void {
before(() => {
enableCommonLoggers();
console.log(`Entity type: ${noSessionTestClientType}`);
serviceBusClient = createServiceBusClientForTests();
});

after(() => {
disableCommonLoggers();
return serviceBusClient.test.after();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import {
import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors";
import { TestMessage, checkWithTimeout } from "../public/utils/testUtils";
import { DispositionType } from "../../src/serviceBusMessage";
import { ServiceBusSessionReceiver } from "../../src/receivers/sessionReceiver";
import {
ServiceBusSessionReceiver,
ServiceBusSessionReceiverImpl
} from "../../src/receivers/sessionReceiver";
import {
EntityName,
ServiceBusClientForTests,
Expand All @@ -24,7 +27,9 @@ import {
} from "../public/utils/testutils2";
import { getDeliveryProperty } from "./utils/misc";
import { singleMessagePromise } from "./streamingReceiver.spec";
import { defer } from "./unit/unittestUtils";
const should = chai.should();
const assert = chai.assert;
chai.use(chaiAsPromised);

describe("Streaming with sessions", () => {
Expand All @@ -35,9 +40,11 @@ describe("Streaming with sessions", () => {
let unexpectedError: Error | undefined;
let serviceBusClient: ServiceBusClientForTests;
const testClientType = getRandomTestClientTypeWithSessions();
let entityNames: AutoGeneratedEntity;

before(() => {
before(async () => {
serviceBusClient = createServiceBusClientForTests();
entityNames = await serviceBusClient.test.createTestEntities(testClientType);
});

after(async () => {
Expand All @@ -54,7 +61,7 @@ describe("Streaming with sessions", () => {
async function beforeEachTest(
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<EntityName> {
const entityNames = await createReceiverForTests(receiveMode);
await createReceiverForTests(receiveMode);

sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
Expand All @@ -68,47 +75,44 @@ describe("Streaming with sessions", () => {
}

async function createReceiverForTests(
receiveMode?: "peekLock" | "receiveAndDelete"
): Promise<AutoGeneratedEntity> {
const entityNames = await serviceBusClient.test.createTestEntities(testClientType);
receiveMode?: "peekLock" | "receiveAndDelete",
sessionId: string = TestMessage.sessionId
): Promise<void> {
receiver = serviceBusClient.test.addToCleanup(
receiveMode === "receiveAndDelete"
? entityNames.queue
? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId, {
? await serviceBusClient.acceptSession(entityNames.queue, sessionId, {
receiveMode: "receiveAndDelete"
})
: await serviceBusClient.acceptSession(
entityNames.topic!,
entityNames.subscription!,
TestMessage.sessionId,
sessionId,
{
receiveMode: "receiveAndDelete"
}
)
: entityNames.queue
? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId)
? await serviceBusClient.acceptSession(entityNames.queue, sessionId)
: await serviceBusClient.acceptSession(
entityNames.topic!,
entityNames.subscription!,
TestMessage.sessionId
sessionId
)
);
return entityNames;
}

it(
testClientType + ": Streaming - stop a message subscription without closing the receiver",
async () => {
const entities = await serviceBusClient.test.createTestEntities(testClientType);

const sender2 = await serviceBusClient.test.createSender(entities);
const sender2 = await serviceBusClient.test.createSender(entityNames);
await sender2.sendMessages({
body: ".close() test - first message",
sessionId: TestMessage.sessionId
});

const actualReceiver = await serviceBusClient.test.acceptSessionWithPeekLock(
entities,
entityNames,
TestMessage.sessionId
);
const { subscriber, messages } = await singleMessagePromise(actualReceiver);
Expand All @@ -135,7 +139,7 @@ describe("Streaming with sessions", () => {

await actualReceiver.close(); // release the session lock

const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

// clean out the remaining message that never arrived.
const [finalMessage] = await receiver2.receiveMessages(1, { maxWaitTimeInMs: 5000 });
Expand Down Expand Up @@ -636,84 +640,56 @@ describe("Streaming with sessions", () => {
}
);
});

describe(testClientType + ": Sessions Streaming - maxConcurrentCalls", function(): void {
afterEach(async () => {
await afterEachTest();
});

beforeEach(async () => {
await beforeEachTest();
});
const iterationLimit = 1;
const createSessionId = (maxConcurrentCalls: number | undefined, i: number) =>
`max-concurrent-calls-session-${maxConcurrentCalls}[${i}]`;

async function testConcurrency(maxConcurrentCalls?: number): Promise<void> {
if (
typeof maxConcurrentCalls === "number" &&
(maxConcurrentCalls < 1 || maxConcurrentCalls > 2)
) {
chai.assert.fail(
"Sorry, the tests here only support cases when maxConcurrentCalls is set to 1 or 2"
);
}
// to (occasionally) reproduce session overlap issue mentioned in:
// https://github.com/Azure/azure-sdk-for-js/issues/14441
// const iterationLimit = 1000;
// const createSessionId = (_maxConcurrentCalls: number | undefined, _i: number) =>
// `max-concurrent-calls-session`;

const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()];
const batchMessageToSend = await sender.createMessageBatch();
for (const message of testMessages) {
batchMessageToSend.tryAddMessage(message);
}
await sender.sendMessages(batchMessageToSend);
[undefined, 1, 2].forEach((maxConcurrentCalls) => {
for (let i = 0; i < iterationLimit; ++i) {
it(`[${i}] maxConcurrentCalls (with sessions), adding ${maxConcurrentCalls} credits`, async () => {
const sessionId = createSessionId(maxConcurrentCalls, i);
await createReceiverForTests("peekLock", sessionId);

const settledMsgs: ServiceBusReceivedMessage[] = [];
const receivedMsgs: ServiceBusReceivedMessage[] = [];
const sessionReceiver = receiver as ServiceBusSessionReceiverImpl;

receiver.subscribe(
{
async processMessage(msg: ServiceBusReceivedMessage) {
if (receivedMsgs.length === 1) {
if ((!maxConcurrentCalls || maxConcurrentCalls === 1) && settledMsgs.length === 0) {
throw new Error(
"onMessage for the second message should not have been called before the first message got settled"
);
}
} else {
if (maxConcurrentCalls === 2 && settledMsgs.length !== 0) {
throw new Error(
"onMessage for the second message should have been called before the first message got settled"
);
}
}
const { promise: addCreditPromise, resolve: resolveAddCredit } = defer<number>();

receivedMsgs.push(msg);
await delay(2000);
await receiver.completeMessage(msg);
settledMsgs.push(msg);
},
processError
},
maxConcurrentCalls
? {
maxConcurrentCalls
}
: {}
);
sessionReceiver["_messageSession"]!["_link"]!.addCredit = resolveAddCredit;

await checkWithTimeout(() => settledMsgs.length === 2);
await receiver.close();
let processErrorArgs: ProcessErrorArgs | undefined;

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(settledMsgs.length, 2, `Expected 2, received ${settledMsgs.length} messages.`);
}
sessionReceiver.subscribe(
{
async processMessage(_message: ServiceBusReceivedMessage) {},
async processError(args: ProcessErrorArgs) {
processErrorArgs = args;
}
},
{
maxConcurrentCalls
}
);

it("no maxConcurrentCalls passed(with sessions)", async function(): Promise<void> {
await testConcurrency();
});
const actualCredits = await addCreditPromise;

it("pass 1 for maxConcurrentCalls(with sessions)", async function(): Promise<void> {
await testConcurrency(1);
});
// the default value (ie: maxConcurrentCalls === undefined) is 1
const expectedCredits = maxConcurrentCalls ?? 1;

it("pass 2 for maxConcurrentCalls(with sessions)", async function(): Promise<void> {
await testConcurrency(2);
assert.equal(actualCredits, expectedCredits);
assert.isUndefined(processErrorArgs, "No error should have occurred.");
});
}
});
});

Expand Down
10 changes: 1 addition & 9 deletions sdk/servicebus/service-bus/test/internal/utils/misc.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { createClientLogger, setLogLevel } from "@azure/logger";
import { createClientLogger } from "@azure/logger";
import { Delivery, ServiceBusReceivedMessage } from "../../../src";
import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage";

Expand All @@ -20,11 +20,3 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver
}

export const testLogger = createClientLogger("test");

export function enableCommonLoggers() {
setLogLevel("verbose");
}

export function disableCommonLoggers() {
setLogLevel();
}

0 comments on commit 121d898

Please sign in to comment.