From 121d898e90c755c3ae46f61f9ac3b2340633366f Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Thu, 8 Jul 2021 13:22:58 -0700 Subject: [PATCH] [service-bus] Mitigate session-overlap test (#16183) We have an intermittent issue where we close a session and it releases, but there is a delay (Documented as part of this issue here: [link](https://github.com/Azure/azure-sdk-for-js/issues/14441). We've seen it in our normal live tests (not every time, but often enough to be noted). I've done two things to make this "better": - Mitigated the issue so we can still get a decent test signal by making the tests no longer use the same session ID. They didn't actually need to send or receive messages for the test itself (it's just validating that we're adding the right amount of credits) so I simplified the tests. - Put in some wrapper code to make it simpler to reproduce the issue in the future. It will basically run these session based tests 1000x, which still doesn't _always_ reproduce the issue but can. As a bonus, there was a `peekMessages()` test that expected that a message that has been sent will immediately be available, which isn't always true (there can still be a delay between "message accepted" and "message receivable"). I've fixed this test by doing a `receiveMessages()` first (which will wait until messages are available), abandoning the message and then doing a peek, which should work more reliably. --- .../internal/backupMessageSettlement.spec.ts | 11 +- .../test/internal/batchReceiver.spec.ts | 15 +- .../streamingReceiverSessions.spec.ts | 134 +++++++----------- .../service-bus/test/internal/utils/misc.ts | 10 +- 4 files changed, 67 insertions(+), 103 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts index a9e958cd6e10..03ae96613db0 100644 --- a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts @@ -19,7 +19,7 @@ import { ServiceBusMessageImpl, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; -import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; +import { testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -346,13 +346,8 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", it( noSessionTestClientType + ": deadLetter() moves message to deadletter queue", async function(): Promise { - enableCommonLoggers(); - try { - await beforeEachTest(noSessionTestClientType); - await testDeadletter(); - } finally { - disableCommonLoggers(); - } + await beforeEachTest(noSessionTestClientType); + await testDeadletter(); } ); diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index f86470518e13..675b21eba795 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -26,7 +26,7 @@ import { import { LinkEntity } from "../../src/core/linkEntity"; import { Constants, StandardAbortMessage } from "@azure/core-amqp"; import { BatchingReceiver } from "../../src/core/batchingReceiver"; -import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; +import { testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -328,11 +328,14 @@ describe("Batching Receiver", () => { : TestMessage.getSample(); await sender.sendMessages(testMessages); + // we need to validate that the message has arrived (peek will just return immediately + // if the queue/subscription is empty) + const [receivedMessage] = await receiver.receiveMessages(1); + assert.ok(receivedMessage); + await receiver.abandonMessage(receivedMessage); // put the message back + const [peekedMsg] = await receiver.peekMessages(1); - if (!peekedMsg) { - // Sometimes the peek call does not return any messages :( - return; - } + assert.ok(peekedMsg); should.equal( !peekedMsg.lockToken, @@ -810,13 +813,11 @@ describe("Batching Receiver", () => { describe("Batch Receiver - disconnects", () => { describe("Batch Receiver - disconnects (non-session)", function(): void { before(() => { - enableCommonLoggers(); console.log(`Entity type: ${noSessionTestClientType}`); serviceBusClient = createServiceBusClientForTests(); }); after(() => { - disableCommonLoggers(); return serviceBusClient.test.after(); }); diff --git a/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts index 1f6c494fa3cb..2986432db68a 100644 --- a/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts @@ -13,7 +13,10 @@ import { import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors"; import { TestMessage, checkWithTimeout } from "../public/utils/testUtils"; import { DispositionType } from "../../src/serviceBusMessage"; -import { ServiceBusSessionReceiver } from "../../src/receivers/sessionReceiver"; +import { + ServiceBusSessionReceiver, + ServiceBusSessionReceiverImpl +} from "../../src/receivers/sessionReceiver"; import { EntityName, ServiceBusClientForTests, @@ -24,7 +27,9 @@ import { } from "../public/utils/testutils2"; import { getDeliveryProperty } from "./utils/misc"; import { singleMessagePromise } from "./streamingReceiver.spec"; +import { defer } from "./unit/unittestUtils"; const should = chai.should(); +const assert = chai.assert; chai.use(chaiAsPromised); describe("Streaming with sessions", () => { @@ -35,9 +40,11 @@ describe("Streaming with sessions", () => { let unexpectedError: Error | undefined; let serviceBusClient: ServiceBusClientForTests; const testClientType = getRandomTestClientTypeWithSessions(); + let entityNames: AutoGeneratedEntity; - before(() => { + before(async () => { serviceBusClient = createServiceBusClientForTests(); + entityNames = await serviceBusClient.test.createTestEntities(testClientType); }); after(async () => { @@ -54,7 +61,7 @@ describe("Streaming with sessions", () => { async function beforeEachTest( receiveMode?: "peekLock" | "receiveAndDelete" ): Promise { - const entityNames = await createReceiverForTests(receiveMode); + await createReceiverForTests(receiveMode); sender = serviceBusClient.test.addToCleanup( serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) @@ -68,47 +75,44 @@ describe("Streaming with sessions", () => { } async function createReceiverForTests( - receiveMode?: "peekLock" | "receiveAndDelete" - ): Promise { - const entityNames = await serviceBusClient.test.createTestEntities(testClientType); + receiveMode?: "peekLock" | "receiveAndDelete", + sessionId: string = TestMessage.sessionId + ): Promise { receiver = serviceBusClient.test.addToCleanup( receiveMode === "receiveAndDelete" ? entityNames.queue - ? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId, { + ? await serviceBusClient.acceptSession(entityNames.queue, sessionId, { receiveMode: "receiveAndDelete" }) : await serviceBusClient.acceptSession( entityNames.topic!, entityNames.subscription!, - TestMessage.sessionId, + sessionId, { receiveMode: "receiveAndDelete" } ) : entityNames.queue - ? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId) + ? await serviceBusClient.acceptSession(entityNames.queue, sessionId) : await serviceBusClient.acceptSession( entityNames.topic!, entityNames.subscription!, - TestMessage.sessionId + sessionId ) ); - return entityNames; } it( testClientType + ": Streaming - stop a message subscription without closing the receiver", async () => { - const entities = await serviceBusClient.test.createTestEntities(testClientType); - - const sender2 = await serviceBusClient.test.createSender(entities); + const sender2 = await serviceBusClient.test.createSender(entityNames); await sender2.sendMessages({ body: ".close() test - first message", sessionId: TestMessage.sessionId }); const actualReceiver = await serviceBusClient.test.acceptSessionWithPeekLock( - entities, + entityNames, TestMessage.sessionId ); const { subscriber, messages } = await singleMessagePromise(actualReceiver); @@ -135,7 +139,7 @@ describe("Streaming with sessions", () => { await actualReceiver.close(); // release the session lock - const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities); + const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames); // clean out the remaining message that never arrived. const [finalMessage] = await receiver2.receiveMessages(1, { maxWaitTimeInMs: 5000 }); @@ -636,84 +640,56 @@ describe("Streaming with sessions", () => { } ); }); - describe(testClientType + ": Sessions Streaming - maxConcurrentCalls", function(): void { afterEach(async () => { await afterEachTest(); }); - beforeEach(async () => { - await beforeEachTest(); - }); + const iterationLimit = 1; + const createSessionId = (maxConcurrentCalls: number | undefined, i: number) => + `max-concurrent-calls-session-${maxConcurrentCalls}[${i}]`; - async function testConcurrency(maxConcurrentCalls?: number): Promise { - if ( - typeof maxConcurrentCalls === "number" && - (maxConcurrentCalls < 1 || maxConcurrentCalls > 2) - ) { - chai.assert.fail( - "Sorry, the tests here only support cases when maxConcurrentCalls is set to 1 or 2" - ); - } + // to (occasionally) reproduce session overlap issue mentioned in: + // https://github.com/Azure/azure-sdk-for-js/issues/14441 + // const iterationLimit = 1000; + // const createSessionId = (_maxConcurrentCalls: number | undefined, _i: number) => + // `max-concurrent-calls-session`; - const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()]; - const batchMessageToSend = await sender.createMessageBatch(); - for (const message of testMessages) { - batchMessageToSend.tryAddMessage(message); - } - await sender.sendMessages(batchMessageToSend); + [undefined, 1, 2].forEach((maxConcurrentCalls) => { + for (let i = 0; i < iterationLimit; ++i) { + it(`[${i}] maxConcurrentCalls (with sessions), adding ${maxConcurrentCalls} credits`, async () => { + const sessionId = createSessionId(maxConcurrentCalls, i); + await createReceiverForTests("peekLock", sessionId); - const settledMsgs: ServiceBusReceivedMessage[] = []; - const receivedMsgs: ServiceBusReceivedMessage[] = []; + const sessionReceiver = receiver as ServiceBusSessionReceiverImpl; - receiver.subscribe( - { - async processMessage(msg: ServiceBusReceivedMessage) { - if (receivedMsgs.length === 1) { - if ((!maxConcurrentCalls || maxConcurrentCalls === 1) && settledMsgs.length === 0) { - throw new Error( - "onMessage for the second message should not have been called before the first message got settled" - ); - } - } else { - if (maxConcurrentCalls === 2 && settledMsgs.length !== 0) { - throw new Error( - "onMessage for the second message should have been called before the first message got settled" - ); - } - } + const { promise: addCreditPromise, resolve: resolveAddCredit } = defer(); - receivedMsgs.push(msg); - await delay(2000); - await receiver.completeMessage(msg); - settledMsgs.push(msg); - }, - processError - }, - maxConcurrentCalls - ? { - maxConcurrentCalls - } - : {} - ); + sessionReceiver["_messageSession"]!["_link"]!.addCredit = resolveAddCredit; - await checkWithTimeout(() => settledMsgs.length === 2); - await receiver.close(); + let processErrorArgs: ProcessErrorArgs | undefined; - should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); - should.equal(settledMsgs.length, 2, `Expected 2, received ${settledMsgs.length} messages.`); - } + sessionReceiver.subscribe( + { + async processMessage(_message: ServiceBusReceivedMessage) {}, + async processError(args: ProcessErrorArgs) { + processErrorArgs = args; + } + }, + { + maxConcurrentCalls + } + ); - it("no maxConcurrentCalls passed(with sessions)", async function(): Promise { - await testConcurrency(); - }); + const actualCredits = await addCreditPromise; - it("pass 1 for maxConcurrentCalls(with sessions)", async function(): Promise { - await testConcurrency(1); - }); + // the default value (ie: maxConcurrentCalls === undefined) is 1 + const expectedCredits = maxConcurrentCalls ?? 1; - it("pass 2 for maxConcurrentCalls(with sessions)", async function(): Promise { - await testConcurrency(2); + assert.equal(actualCredits, expectedCredits); + assert.isUndefined(processErrorArgs, "No error should have occurred."); + }); + } }); }); diff --git a/sdk/servicebus/service-bus/test/internal/utils/misc.ts b/sdk/servicebus/service-bus/test/internal/utils/misc.ts index 41cea80aeda0..9c2183b4a323 100644 --- a/sdk/servicebus/service-bus/test/internal/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/internal/utils/misc.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { createClientLogger, setLogLevel } from "@azure/logger"; +import { createClientLogger } from "@azure/logger"; import { Delivery, ServiceBusReceivedMessage } from "../../../src"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -20,11 +20,3 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver } export const testLogger = createClientLogger("test"); - -export function enableCommonLoggers() { - setLogLevel("verbose"); -} - -export function disableCommonLoggers() { - setLogLevel(); -}