diff --git a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts index a9e958cd6e10..03ae96613db0 100644 --- a/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/backupMessageSettlement.spec.ts @@ -19,7 +19,7 @@ import { ServiceBusMessageImpl, ServiceBusReceivedMessage } from "../../src/serviceBusMessage"; -import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; +import { testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -346,13 +346,8 @@ describe("Message settlement After Receiver is Closed - Through ManagementLink", it( noSessionTestClientType + ": deadLetter() moves message to deadletter queue", async function(): Promise { - enableCommonLoggers(); - try { - await beforeEachTest(noSessionTestClientType); - await testDeadletter(); - } finally { - disableCommonLoggers(); - } + await beforeEachTest(noSessionTestClientType); + await testDeadletter(); } ); diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index f86470518e13..675b21eba795 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -26,7 +26,7 @@ import { import { LinkEntity } from "../../src/core/linkEntity"; import { Constants, StandardAbortMessage } from "@azure/core-amqp"; import { BatchingReceiver } from "../../src/core/batchingReceiver"; -import { disableCommonLoggers, enableCommonLoggers, testLogger } from "./utils/misc"; +import { testLogger } from "./utils/misc"; const should = chai.should(); chai.use(chaiAsPromised); @@ -328,11 +328,14 @@ describe("Batching Receiver", () => { : TestMessage.getSample(); await sender.sendMessages(testMessages); + // we need to validate that the message has arrived (peek will just return immediately + // if the queue/subscription is empty) + const [receivedMessage] = await receiver.receiveMessages(1); + assert.ok(receivedMessage); + await receiver.abandonMessage(receivedMessage); // put the message back + const [peekedMsg] = await receiver.peekMessages(1); - if (!peekedMsg) { - // Sometimes the peek call does not return any messages :( - return; - } + assert.ok(peekedMsg); should.equal( !peekedMsg.lockToken, @@ -810,13 +813,11 @@ describe("Batching Receiver", () => { describe("Batch Receiver - disconnects", () => { describe("Batch Receiver - disconnects (non-session)", function(): void { before(() => { - enableCommonLoggers(); console.log(`Entity type: ${noSessionTestClientType}`); serviceBusClient = createServiceBusClientForTests(); }); after(() => { - disableCommonLoggers(); return serviceBusClient.test.after(); }); diff --git a/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts index 1f6c494fa3cb..2986432db68a 100644 --- a/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/streamingReceiverSessions.spec.ts @@ -13,7 +13,10 @@ import { import { getAlreadyReceivingErrorMsg, MessageAlreadySettled } from "../../src/util/errors"; import { TestMessage, checkWithTimeout } from "../public/utils/testUtils"; import { DispositionType } from "../../src/serviceBusMessage"; -import { ServiceBusSessionReceiver } from "../../src/receivers/sessionReceiver"; +import { + ServiceBusSessionReceiver, + ServiceBusSessionReceiverImpl +} from "../../src/receivers/sessionReceiver"; import { EntityName, ServiceBusClientForTests, @@ -24,7 +27,9 @@ import { } from "../public/utils/testutils2"; import { getDeliveryProperty } from "./utils/misc"; import { singleMessagePromise } from "./streamingReceiver.spec"; +import { defer } from "./unit/unittestUtils"; const should = chai.should(); +const assert = chai.assert; chai.use(chaiAsPromised); describe("Streaming with sessions", () => { @@ -35,9 +40,11 @@ describe("Streaming with sessions", () => { let unexpectedError: Error | undefined; let serviceBusClient: ServiceBusClientForTests; const testClientType = getRandomTestClientTypeWithSessions(); + let entityNames: AutoGeneratedEntity; - before(() => { + before(async () => { serviceBusClient = createServiceBusClientForTests(); + entityNames = await serviceBusClient.test.createTestEntities(testClientType); }); after(async () => { @@ -54,7 +61,7 @@ describe("Streaming with sessions", () => { async function beforeEachTest( receiveMode?: "peekLock" | "receiveAndDelete" ): Promise { - const entityNames = await createReceiverForTests(receiveMode); + await createReceiverForTests(receiveMode); sender = serviceBusClient.test.addToCleanup( serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) @@ -68,47 +75,44 @@ describe("Streaming with sessions", () => { } async function createReceiverForTests( - receiveMode?: "peekLock" | "receiveAndDelete" - ): Promise { - const entityNames = await serviceBusClient.test.createTestEntities(testClientType); + receiveMode?: "peekLock" | "receiveAndDelete", + sessionId: string = TestMessage.sessionId + ): Promise { receiver = serviceBusClient.test.addToCleanup( receiveMode === "receiveAndDelete" ? entityNames.queue - ? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId, { + ? await serviceBusClient.acceptSession(entityNames.queue, sessionId, { receiveMode: "receiveAndDelete" }) : await serviceBusClient.acceptSession( entityNames.topic!, entityNames.subscription!, - TestMessage.sessionId, + sessionId, { receiveMode: "receiveAndDelete" } ) : entityNames.queue - ? await serviceBusClient.acceptSession(entityNames.queue, TestMessage.sessionId) + ? await serviceBusClient.acceptSession(entityNames.queue, sessionId) : await serviceBusClient.acceptSession( entityNames.topic!, entityNames.subscription!, - TestMessage.sessionId + sessionId ) ); - return entityNames; } it( testClientType + ": Streaming - stop a message subscription without closing the receiver", async () => { - const entities = await serviceBusClient.test.createTestEntities(testClientType); - - const sender2 = await serviceBusClient.test.createSender(entities); + const sender2 = await serviceBusClient.test.createSender(entityNames); await sender2.sendMessages({ body: ".close() test - first message", sessionId: TestMessage.sessionId }); const actualReceiver = await serviceBusClient.test.acceptSessionWithPeekLock( - entities, + entityNames, TestMessage.sessionId ); const { subscriber, messages } = await singleMessagePromise(actualReceiver); @@ -135,7 +139,7 @@ describe("Streaming with sessions", () => { await actualReceiver.close(); // release the session lock - const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entities); + const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames); // clean out the remaining message that never arrived. const [finalMessage] = await receiver2.receiveMessages(1, { maxWaitTimeInMs: 5000 }); @@ -636,84 +640,56 @@ describe("Streaming with sessions", () => { } ); }); - describe(testClientType + ": Sessions Streaming - maxConcurrentCalls", function(): void { afterEach(async () => { await afterEachTest(); }); - beforeEach(async () => { - await beforeEachTest(); - }); + const iterationLimit = 1; + const createSessionId = (maxConcurrentCalls: number | undefined, i: number) => + `max-concurrent-calls-session-${maxConcurrentCalls}[${i}]`; - async function testConcurrency(maxConcurrentCalls?: number): Promise { - if ( - typeof maxConcurrentCalls === "number" && - (maxConcurrentCalls < 1 || maxConcurrentCalls > 2) - ) { - chai.assert.fail( - "Sorry, the tests here only support cases when maxConcurrentCalls is set to 1 or 2" - ); - } + // to (occasionally) reproduce session overlap issue mentioned in: + // https://github.com/Azure/azure-sdk-for-js/issues/14441 + // const iterationLimit = 1000; + // const createSessionId = (_maxConcurrentCalls: number | undefined, _i: number) => + // `max-concurrent-calls-session`; - const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()]; - const batchMessageToSend = await sender.createMessageBatch(); - for (const message of testMessages) { - batchMessageToSend.tryAddMessage(message); - } - await sender.sendMessages(batchMessageToSend); + [undefined, 1, 2].forEach((maxConcurrentCalls) => { + for (let i = 0; i < iterationLimit; ++i) { + it(`[${i}] maxConcurrentCalls (with sessions), adding ${maxConcurrentCalls} credits`, async () => { + const sessionId = createSessionId(maxConcurrentCalls, i); + await createReceiverForTests("peekLock", sessionId); - const settledMsgs: ServiceBusReceivedMessage[] = []; - const receivedMsgs: ServiceBusReceivedMessage[] = []; + const sessionReceiver = receiver as ServiceBusSessionReceiverImpl; - receiver.subscribe( - { - async processMessage(msg: ServiceBusReceivedMessage) { - if (receivedMsgs.length === 1) { - if ((!maxConcurrentCalls || maxConcurrentCalls === 1) && settledMsgs.length === 0) { - throw new Error( - "onMessage for the second message should not have been called before the first message got settled" - ); - } - } else { - if (maxConcurrentCalls === 2 && settledMsgs.length !== 0) { - throw new Error( - "onMessage for the second message should have been called before the first message got settled" - ); - } - } + const { promise: addCreditPromise, resolve: resolveAddCredit } = defer(); - receivedMsgs.push(msg); - await delay(2000); - await receiver.completeMessage(msg); - settledMsgs.push(msg); - }, - processError - }, - maxConcurrentCalls - ? { - maxConcurrentCalls - } - : {} - ); + sessionReceiver["_messageSession"]!["_link"]!.addCredit = resolveAddCredit; - await checkWithTimeout(() => settledMsgs.length === 2); - await receiver.close(); + let processErrorArgs: ProcessErrorArgs | undefined; - should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); - should.equal(settledMsgs.length, 2, `Expected 2, received ${settledMsgs.length} messages.`); - } + sessionReceiver.subscribe( + { + async processMessage(_message: ServiceBusReceivedMessage) {}, + async processError(args: ProcessErrorArgs) { + processErrorArgs = args; + } + }, + { + maxConcurrentCalls + } + ); - it("no maxConcurrentCalls passed(with sessions)", async function(): Promise { - await testConcurrency(); - }); + const actualCredits = await addCreditPromise; - it("pass 1 for maxConcurrentCalls(with sessions)", async function(): Promise { - await testConcurrency(1); - }); + // the default value (ie: maxConcurrentCalls === undefined) is 1 + const expectedCredits = maxConcurrentCalls ?? 1; - it("pass 2 for maxConcurrentCalls(with sessions)", async function(): Promise { - await testConcurrency(2); + assert.equal(actualCredits, expectedCredits); + assert.isUndefined(processErrorArgs, "No error should have occurred."); + }); + } }); }); diff --git a/sdk/servicebus/service-bus/test/internal/utils/misc.ts b/sdk/servicebus/service-bus/test/internal/utils/misc.ts index 41cea80aeda0..9c2183b4a323 100644 --- a/sdk/servicebus/service-bus/test/internal/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/internal/utils/misc.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { createClientLogger, setLogLevel } from "@azure/logger"; +import { createClientLogger } from "@azure/logger"; import { Delivery, ServiceBusReceivedMessage } from "../../../src"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -20,11 +20,3 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver } export const testLogger = createClientLogger("test"); - -export function enableCommonLoggers() { - setLogLevel("verbose"); -} - -export function disableCommonLoggers() { - setLogLevel(); -}